From 0478a09f257cba17565e7b62c8a7b62f46aaf206 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 30 Oct 2023 05:24:36 +0000 Subject: [PATCH 01/11] Replace fs with fsspec --- fugue/__init__.py | 1 - fugue/_utils/io.py | 62 +++++++++---------- fugue/dataframe/utils.py | 37 ++++------- fugue/execution/execution_engine.py | 7 --- fugue/execution/native_execution_engine.py | 16 ++--- fugue/workflow/_checkpoint.py | 18 +++--- fugue_dask/_io.py | 27 ++++---- fugue_dask/execution_engine.py | 17 ++--- fugue_duckdb/_io.py | 47 +++++--------- fugue_duckdb/execution_engine.py | 9 +-- fugue_ibis/execution_engine.py | 6 +- fugue_ray/_utils/io.py | 28 ++++----- fugue_spark/_utils/io.py | 6 +- fugue_spark/execution_engine.py | 9 +-- fugue_test/builtin_suite.py | 24 +++---- fugue_test/execution_suite.py | 29 ++++----- tests/fugue/dataframe/test_utils.py | 6 +- tests/fugue/utils/test_io.py | 20 +++--- .../workflow/test_workflow_determinism.py | 2 +- tests/fugue_dask/test_io.py | 8 +-- tests/fugue_ray/test_execution_engine.py | 12 ++-- tests/fugue_spark/utils/test_io.py | 29 ++++----- 22 files changed, 165 insertions(+), 255 deletions(-) diff --git a/fugue/__init__.py b/fugue/__init__.py index 3de97585..5824aeda 100644 --- a/fugue/__init__.py +++ b/fugue/__init__.py @@ -1,6 +1,5 @@ # flake8: noqa from triad.collections import Schema -from triad.collections.fs import FileSystem from fugue.api import out_transform, transform from fugue.bag.array_bag import ArrayBag diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index d3d89eb7..61db1912 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -3,12 +3,12 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from urllib.parse import urlparse -import fs as pfs import pandas as pd +from fsspec import AbstractFileSystem from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw +from triad.utils.io import join, url_to_fs from triad.utils.pandas_like import PD_UTILS from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame @@ -32,7 +32,7 @@ def __init__(self, path: str, format_hint: Optional[str] = None): else: self._uri = urlparse(path[:last]) self._glob_pattern = path[last + 1 :] - self._path = pfs.path.combine(self._uri.path, self._glob_pattern) + self._path = join(self._uri.path, self._glob_pattern) if format_hint is None or format_hint == "": for k, v in _FORMAT_MAP.items(): @@ -54,9 +54,23 @@ def assert_no_glob(self) -> "FileParser": def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser": uri = self.uri if glob != "": - uri = pfs.path.combine(uri, glob) + uri = join(uri, glob) return FileParser(uri, format_hint or self._orig_format_hint) + def find_all(self, fs: Optional[AbstractFileSystem] = None) -> List[str]: + if self.glob_pattern == "": + return [self.uri] + else: + if fs is None: + return self.fs.glob(self._path) + else: + return fs.glob(self._path) + + @property + def fs(self) -> AbstractFileSystem: + _fs, _ = url_to_fs(self.uri) + return _fs + @property def glob_pattern(self) -> str: return self._glob_pattern @@ -69,7 +83,7 @@ def uri(self) -> str: def uri_with_glob(self) -> str: if self.glob_pattern == "": return self.uri - return pfs.path.combine(self.uri, self.glob_pattern) + return join(self.uri, self.glob_pattern) @property def parent(self) -> str: @@ -97,7 +111,7 @@ def load_df( uri: Union[str, List[str]], format_hint: Optional[str] = None, columns: Any = None, - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> LocalBoundedDataFrame: if isinstance(uri, str): @@ -117,7 +131,7 @@ def save_df( uri: str, format_hint: Optional[str] = None, mode: str = "overwrite", - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> None: assert_or_throw( @@ -125,30 +139,22 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if fs is None: - fs = FileSystem() + fs, _ = url_to_fs(uri) if fs.exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) + fs.rm(uri, recursive=True) except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass + pass _FORMAT_SAVE[p.file_format](df, p, **kwargs) def _get_single_files( - fp: Iterable[FileParser], fs: Optional[FileSystem] + fp: Iterable[FileParser], fs: Optional[AbstractFileSystem] ) -> Iterable[FileParser]: - if fs is None: - fs = FileSystem() for f in fp: if f.glob_pattern != "": - files = [ - FileParser(pfs.path.combine(f.uri, pfs.path.basename(x.path))) - for x in fs.opendir(f.uri).glob(f.glob_pattern) - ] + files = [FileParser(x) for x in f.find_all(fs)] yield from _get_single_files(files, fs) else: yield f @@ -189,12 +195,9 @@ def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame: def load_dir() -> pd.DataFrame: - fs = FileSystem() + fs, _ = url_to_fs(path) return pd.concat( - [ - pd.read_csv(pfs.path.combine(path, pfs.path.basename(x.path)), **kwargs) - for x in fs.opendir(path).glob("*.csv") - ] + [pd.read_csv(x, **kwargs) for x in fs.glob(join(path, "*.csv"))] ) try: @@ -257,13 +260,8 @@ def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame: try: return pd.read_json(path, **kw) except (IsADirectoryError, PermissionError): - fs = FileSystem() - return pd.concat( - [ - pd.read_json(pfs.path.combine(path, pfs.path.basename(x.path)), **kw) - for x in fs.opendir(path).glob("*.json") - ] - ) + fs, _ = url_to_fs(path) + return pd.concat([pd.read_json(x, **kw) for x in fs.glob(join(path, "*.json"))]) def _load_json( diff --git a/fugue/dataframe/utils.py b/fugue/dataframe/utils.py index ffd7c3f8..2afe32d2 100644 --- a/fugue/dataframe/utils.py +++ b/fugue/dataframe/utils.py @@ -1,15 +1,15 @@ -import os import pickle -from typing import Any, Iterable, Optional, Tuple, List, Dict +from typing import Any, Dict, Iterable, List, Optional, Tuple import pandas as pd import pyarrow as pa -from fs import open_fs -from triad import FileSystem, Schema, assert_or_throw +from fsspec import AbstractFileSystem +from triad import Schema, assert_or_throw from triad.collections.schema import SchemaError from triad.exceptions import InvalidOperationError from triad.utils.assertion import assert_arg_not_none from triad.utils.assertion import assert_or_throw as aot +from triad.utils.io import url_to_fs from triad.utils.pyarrow import pa_batch_to_dicts from .api import as_fugue_df, get_column_names, normalize_column_names, rename @@ -112,7 +112,6 @@ def serialize_df( df: Optional[DataFrame], threshold: int = -1, file_path: Optional[str] = None, - fs: Optional[FileSystem] = None, ) -> Optional[bytes]: """Serialize input dataframe to base64 string or to file if it's larger than threshold @@ -121,15 +120,8 @@ def serialize_df( :param threshold: file byte size threshold, defaults to -1 :param file_path: file path to store the data (used only if the serialized data is larger than ``threshold``), defaults to None - :param fs: :class:`~triad:triad.collections.fs.FileSystem`, defaults to None :raises InvalidOperationError: if file is large but ``file_path`` is not provided :return: a pickled blob either containing the data or the file path - - .. note:: - - If fs is not provided but it needs to write to disk, then it will use - :meth:`~fs:fs.opener.registry.Registry.open_fs` to try to open the file to - write. """ if df is None: return None @@ -140,24 +132,20 @@ def serialize_df( else: if file_path is None: raise InvalidOperationError("file_path is not provided") - if fs is None: - with open_fs( - os.path.dirname(file_path), writeable=True, create=False - ) as _fs: - _fs.writebytes(os.path.basename(file_path), data) - else: - fs.writebytes(file_path, data) + fs, path = url_to_fs(file_path) + with fs.open(path, "wb") as f: + f.write(data) return pickle.dumps(file_path) def deserialize_df( - data: Optional[bytes], fs: Optional[FileSystem] = None + data: Optional[bytes], fs: Optional[AbstractFileSystem] = None ) -> Optional[LocalBoundedDataFrame]: """Deserialize json string to :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` :param json_str: json string containing the base64 data or a file path - :param fs: :class:`~triad:triad.collections.fs.FileSystem`, defaults to None + :param fs: the file system to use, defaults to None :raises ValueError: if the json string is invalid, not generated from :func:`~.serialize_df` :return: :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` if ``json_str`` @@ -169,10 +157,9 @@ def deserialize_df( if isinstance(obj, LocalBoundedDataFrame): return obj elif isinstance(obj, str): - if fs is None: - with open_fs(os.path.dirname(obj), create=False) as _fs: - return pickle.loads(_fs.readbytes(os.path.basename(obj))) - return pickle.loads(fs.readbytes(obj)) + fs, path = url_to_fs(obj) + with fs.open(path, "rb") as f: + return pickle.load(f) raise ValueError("data is invalid") diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index 0f6ccd3a..468ab097 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -18,7 +18,6 @@ from uuid import uuid4 from triad import ParamDict, Schema, SerializableRLock, assert_or_throw, to_uuid -from triad.collections.fs import FileSystem from triad.collections.function_wrapper import AnnotatedParam from triad.exceptions import InvalidOperationError from triad.utils.convert import to_size @@ -471,12 +470,6 @@ def set_sql_engine(self, engine: SQLEngine) -> None: """ self._sql_engine = engine - @property - @abstractmethod - def fs(self) -> FileSystem: # pragma: no cover - """File system of this engine instance""" - raise NotImplementedError - @abstractmethod def create_default_map_engine(self) -> MapEngine: # pragma: no cover """Default MapEngine if user doesn't specify""" diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 6cae5f10..c6d91d38 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -1,12 +1,13 @@ import logging import os from typing import Any, Callable, Dict, List, Optional, Type, Union + import numpy as np import pandas as pd from triad import Schema from triad.collections.dict import IndexedOrderedDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw +from triad.utils.io import makedirs from triad.utils.pandas_like import PandasUtils from fugue._utils.io import load_df, save_df @@ -179,7 +180,6 @@ class NativeExecutionEngine(ExecutionEngine): def __init__(self, conf: Any = None): super().__init__(conf) - self._fs = FileSystem() self._log = logging.getLogger() def __repr__(self) -> str: @@ -189,10 +189,6 @@ def __repr__(self) -> str: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - @property def is_distributed(self) -> bool: return False @@ -395,9 +391,7 @@ def load_df( **kwargs: Any, ) -> LocalBoundedDataFrame: return self.to_df( - load_df( - path, format_hint=format_hint, columns=columns, fs=self.fs, **kwargs - ) + load_df(path, format_hint=format_hint, columns=columns, **kwargs) ) def save_df( @@ -413,9 +407,9 @@ def save_df( partition_spec = partition_spec or PartitionSpec() if not force_single and not partition_spec.empty: kwargs["partition_cols"] = partition_spec.partition_by - self.fs.makedirs(os.path.dirname(path), recreate=True) + makedirs(os.path.dirname(path), exist_ok=True) df = self.to_df(df) - save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) + save_df(df, path, format_hint=format_hint, mode=mode, **kwargs) @fugue_annotated_param(NativeExecutionEngine) diff --git a/fugue/workflow/_checkpoint.py b/fugue/workflow/_checkpoint.py index 2cc25039..382244cc 100644 --- a/fugue/workflow/_checkpoint.py +++ b/fugue/workflow/_checkpoint.py @@ -1,14 +1,15 @@ from typing import Any -import fs as pfs +from triad.utils.assertion import assert_or_throw +from triad.utils.hash import to_uuid +from triad.utils.io import exists, join, makedirs, rm + from fugue.collections.partition import PartitionSpec from fugue.collections.yielded import PhysicalYielded from fugue.constants import FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH from fugue.dataframe import DataFrame from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowRuntimeError from fugue.execution.execution_engine import ExecutionEngine -from triad.utils.assertion import assert_or_throw -from triad.utils.hash import to_uuid class Checkpoint(object): @@ -130,7 +131,6 @@ def is_null(self) -> bool: class CheckpointPath(object): def __init__(self, engine: ExecutionEngine): self._engine = engine - self._fs = engine.fs self._log = engine.log self._path = engine.conf.get(FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH, "").strip() self._temp_path = "" @@ -143,14 +143,14 @@ def init_temp_path(self, execution_id: str) -> str: if self._path == "": self._temp_path = "" return "" - self._temp_path = pfs.path.combine(self._path, execution_id) - self._fs.makedirs(self._temp_path, recreate=True) + self._temp_path = join(self._path, execution_id) + makedirs(self._temp_path, exist_ok=True) return self._temp_path def remove_temp_path(self): if self._temp_path != "": try: - self._fs.removetree(self._temp_path) + rm(self._temp_path, recursive=True) except Exception as e: # pragma: no cover self._log.info("Unable to remove " + self._temp_path, e) @@ -162,7 +162,7 @@ def get_temp_file(self, obj_id: str, permanent: bool) -> str: f"{FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH} is not set" ), ) - return pfs.path.combine(path, obj_id + ".parquet") + return join(path, obj_id + ".parquet") def get_table_name(self, obj_id: str, permanent: bool) -> str: path = self._path if permanent else self._temp_path @@ -170,6 +170,6 @@ def get_table_name(self, obj_id: str, permanent: bool) -> str: def temp_file_exists(self, path: str) -> bool: try: - return self._fs.exists(path) + return exists(path) except Exception: # pragma: no cover return False diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index e4d0001b..812be54a 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -1,13 +1,13 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union import fsspec -import fs as pfs import pandas as pd from dask import dataframe as dd +from fsspec import AbstractFileSystem from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw +from triad.utils.io import join, url_to_fs from fugue._utils.io import FileParser, _get_single_files from fugue_dask.dataframe import DaskDataFrame @@ -19,7 +19,7 @@ def load_df( uri: Union[str, List[str]], format_hint: Optional[str] = None, columns: Any = None, - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> DaskDataFrame: if isinstance(uri, str): @@ -39,7 +39,7 @@ def save_df( uri: str, format_hint: Optional[str] = None, mode: str = "overwrite", - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> None: assert_or_throw( @@ -48,16 +48,13 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if fs is None: - fs = FileSystem() + fs, _ = url_to_fs(uri) if fs.exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) - except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass + fs.rm(uri, recursive=True) + except Exception: # pragma: no cover + pass _FORMAT_SAVE[p.file_format](df, p, **kwargs) @@ -100,7 +97,7 @@ def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: fs, path = fsspec.core.url_to_fs(p.uri) fs.makedirs(path, exist_ok=True) df.native.to_csv( - pfs.path.combine(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs} + join(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs} ) @@ -108,7 +105,7 @@ def _safe_load_csv(path: str, **kwargs: Any) -> dd.DataFrame: try: return dd.read_csv(path, **kwargs) except (IsADirectoryError, PermissionError): - return dd.read_csv(pfs.path.combine(path, "*.csv"), **kwargs) + return dd.read_csv(join(path, "*.csv"), **kwargs) def _load_csv( # noqa: C901 @@ -150,14 +147,14 @@ def _load_csv( # noqa: C901 def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: fs, path = fsspec.core.url_to_fs(p.uri) fs.makedirs(path, exist_ok=True) - df.native.to_json(pfs.path.combine(p.uri, "*.json"), **kwargs) + df.native.to_json(join(p.uri, "*.json"), **kwargs) def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame: try: return dd.read_json(path, **kwargs) except (IsADirectoryError, PermissionError): - x = dd.read_json(pfs.path.combine(path, "*.json"), **kwargs) + x = dd.read_json(join(path, "*.json"), **kwargs) print(x.compute()) return x diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index abc07ea5..7dd0bae0 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -7,18 +7,17 @@ from distributed import Client from triad.collections import Schema from triad.collections.dict import IndexedOrderedDict, ParamDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.hash import to_uuid from triad.utils.pandas_like import PandasUtils from triad.utils.threading import RunOnce +from triad.utils.io import makedirs from fugue import StructuredRawSQL from fugue.collections.partition import ( PartitionCursor, PartitionSpec, parse_presort_exp, ) -from fugue.exceptions import FugueBug from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT from fugue.dataframe import ( AnyDataFrame, @@ -28,6 +27,7 @@ PandasDataFrame, ) from fugue.dataframe.utils import get_join_schemas +from fugue.exceptions import FugueBug from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine from fugue.execution.native_execution_engine import NativeExecutionEngine from fugue_dask._constants import FUGUE_DASK_DEFAULT_CONF @@ -206,7 +206,6 @@ def __init__(self, dask_client: Optional[Client] = None, conf: Any = None): p = ParamDict(FUGUE_DASK_DEFAULT_CONF) p.update(ParamDict(conf)) super().__init__(p) - self._fs = FileSystem() self._log = logging.getLogger() self._client = DASK_UTILS.get_or_create_client(dask_client) self._native = NativeExecutionEngine(conf=conf) @@ -227,10 +226,6 @@ def dask_client(self) -> Client: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - def create_default_sql_engine(self) -> SQLEngine: return DaskSQLEngine(self) @@ -527,9 +522,7 @@ def load_df( **kwargs: Any, ) -> DaskDataFrame: return self.to_df( - load_df( - path, format_hint=format_hint, columns=columns, fs=self.fs, **kwargs - ) + load_df(path, format_hint=format_hint, columns=columns, **kwargs) ) def save_df( @@ -556,9 +549,9 @@ def save_df( else: if not partition_spec.empty: kwargs["partition_on"] = partition_spec.partition_by - self.fs.makedirs(os.path.dirname(path), recreate=True) + makedirs(os.path.dirname(path), exist_ok=True) df = self.to_df(df) - save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) + save_df(df, path, format_hint=format_hint, mode=mode, **kwargs) def to_dask_engine_df(df: Any, schema: Any = None) -> DaskDataFrame: diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 7d444fe5..1c28e685 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -3,9 +3,9 @@ from duckdb import DuckDBPyConnection from triad import ParamDict, Schema -from triad.collections.fs import FileSystem -from triad.utils.assertion import assert_or_throw +from triad.utils.assertion import assert_or_throw +from triad.utils.io import isdir, makedirs, rm, exists from fugue._utils.io import FileParser, load_df, save_df from fugue.collections.sql import TempTableName from fugue.dataframe import ArrowDataFrame, LocalBoundedDataFrame @@ -18,26 +18,17 @@ from fugue_duckdb.dataframe import DuckDataFrame -def _get_single_files( - fp: Iterable[FileParser], fs: FileSystem, fmt: str -) -> Iterable[FileParser]: - def _isdir(d: str) -> bool: - try: - return fs.isdir(d) - except Exception: # pragma: no cover - return False - +def _get_single_files(fp: Iterable[FileParser], fmt: str) -> Iterable[FileParser]: for f in fp: - if f.glob_pattern == "" and _isdir(f.uri): + if f.glob_pattern == "" and isdir(f.uri): yield f.with_glob("*." + fmt, fmt) else: yield f class DuckDBIO: - def __init__(self, fs: FileSystem, con: DuckDBPyConnection) -> None: + def __init__(self, con: DuckDBPyConnection) -> None: self._con = con - self._fs = fs self._format_load = {"csv": self._load_csv, "parquet": self._load_parquet} self._format_save = {"csv": self._save_csv, "parquet": self._save_parquet} @@ -55,11 +46,9 @@ def load_df( else: fp = [FileParser(u, format_hint) for u in uri] if fp[0].file_format not in self._format_load: - return load_df( - uri, format_hint=format_hint, columns=columns, fs=self._fs, **kwargs - ) + return load_df(uri, format_hint=format_hint, columns=columns, **kwargs) dfs: List[DuckDataFrame] = [] - for f in _get_single_files(fp, self._fs, fp[0].file_format): + for f in _get_single_files(fp, fp[0].file_format): df = self._format_load[f.file_format](f, columns, **kwargs) dfs.append(df) rel = dfs[0].native @@ -83,23 +72,17 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): - self._fs.makedirs(os.path.dirname(uri), recreate=True) + makedirs(os.path.dirname(uri), exist_ok=True) ldf = ArrowDataFrame(df.as_arrow()) - return save_df( - ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs - ) - fs = self._fs - if fs.exists(uri): + return save_df(ldf, uri=uri, format_hint=format_hint, mode=mode, **kwargs) + if exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) - except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass - if not fs.exists(p.parent): - fs.makedirs(p.parent, recreate=True) + rm(uri, recursive=True) + except Exception: # pragma: no cover + pass + if not exists(p.parent): + makedirs(p.parent, exist_ok=True) self._format_save[p.file_format](df, p, **kwargs) def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index ca67b43f..5d6070b3 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -4,7 +4,6 @@ import duckdb from duckdb import DuckDBPyConnection, DuckDBPyRelation from triad import SerializableRLock -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.schema import quote_name @@ -195,10 +194,6 @@ def connection(self) -> DuckDBPyConnection: def log(self) -> logging.Logger: return self._native_engine.log - @property - def fs(self) -> FileSystem: - return self._native_engine.fs - def create_default_sql_engine(self) -> SQLEngine: return DuckDBEngine(self) @@ -488,7 +483,7 @@ def load_df( columns: Any = None, **kwargs: Any, ) -> LocalBoundedDataFrame: - dio = DuckDBIO(self.fs, self.connection) + dio = DuckDBIO(self.connection) return dio.load_df(path, format_hint, columns, **kwargs) def save_df( @@ -504,7 +499,7 @@ def save_df( partition_spec = partition_spec or PartitionSpec() if not partition_spec.empty and not force_single: kwargs["partition_cols"] = partition_spec.partition_by - dio = DuckDBIO(self.fs, self.connection) + dio = DuckDBIO(self.connection) dio.save_df(_to_duck_df(self, df), path, format_hint, mode, **kwargs) def convert_yield_dataframe(self, df: DataFrame, as_local: bool) -> DataFrame: diff --git a/fugue_ibis/execution_engine.py b/fugue_ibis/execution_engine.py index cdc532b0..1c8b3c4c 100644 --- a/fugue_ibis/execution_engine.py +++ b/fugue_ibis/execution_engine.py @@ -5,7 +5,7 @@ import ibis from ibis import BaseBackend -from triad import FileSystem, assert_or_throw +from triad import assert_or_throw from fugue import StructuredRawSQL from fugue.bag import Bag, LocalBag @@ -375,10 +375,6 @@ def create_default_map_engine(self) -> MapEngine: def log(self) -> logging.Logger: return self.non_ibis_engine.log - @property - def fs(self) -> FileSystem: - return self.non_ibis_engine.fs - def get_current_parallelism(self) -> int: return self.non_ibis_engine.get_current_parallelism() diff --git a/fugue_ray/_utils/io.py b/fugue_ray/_utils/io.py index 6198b81c..eaafd687 100644 --- a/fugue_ray/_utils/io.py +++ b/fugue_ray/_utils/io.py @@ -4,23 +4,24 @@ import pyarrow as pa import ray.data as rd -from fugue import ExecutionEngine -from fugue._utils.io import FileParser, save_df -from fugue.collections.partition import PartitionSpec -from fugue.dataframe import DataFrame -from fugue_ray.dataframe import RayDataFrame from pyarrow import csv as pacsv from pyarrow import json as pajson from ray.data.datasource import FileExtensionFilter from triad.collections import Schema from triad.collections.dict import ParamDict from triad.utils.assertion import assert_or_throw +from triad.utils.io import exists, makedirs, rm + +from fugue import ExecutionEngine +from fugue._utils.io import FileParser, save_df +from fugue.collections.partition import PartitionSpec +from fugue.dataframe import DataFrame +from fugue_ray.dataframe import RayDataFrame class RayIO(object): def __init__(self, engine: ExecutionEngine): self._engine = engine - self._fs = engine.fs self._logger = engine.log self._loads: Dict[str, Callable[..., DataFrame]] = { "csv": self._load_csv, @@ -63,15 +64,12 @@ def save_df( **kwargs: Any, ) -> None: partition_spec = partition_spec or PartitionSpec() - if self._fs.exists(uri): + if exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - self._fs.remove(uri) - except Exception: - try: - self._fs.removetree(uri) - except Exception: # pragma: no cover - pass + rm(uri, recursive=True) + except Exception: # pragma: no cover + pass p = FileParser(uri, format_hint) if not force_single: df = self._prepartition(df, partition_spec=partition_spec) @@ -79,8 +77,8 @@ def save_df( self._saves[p.file_format](df=df, uri=p.uri, **kwargs) else: ldf = df.as_local() - self._fs.makedirs(os.path.dirname(uri), recreate=True) - save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) + makedirs(os.path.dirname(uri), exist_ok=True) + save_df(ldf, uri, format_hint=format_hint, mode=mode, **kwargs) def _save_parquet( self, diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index c36a8856..0e288442 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -4,7 +4,6 @@ from pyspark.sql import SparkSession from triad.collections import Schema from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from fugue._utils.io import FileParser, save_df @@ -16,9 +15,8 @@ class SparkIO(object): - def __init__(self, spark_session: SparkSession, fs: FileSystem): + def __init__(self, spark_session: SparkSession): self._session = spark_session - self._fs = fs self._loads: Dict[str, Callable[..., DataFrame]] = { "csv": self._load_csv, "parquet": self._load_parquet, @@ -64,7 +62,7 @@ def save_df( ldf = df.as_local() if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"): ldf.native.attrs = {} # pragma: no cover - save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) + save_df(ldf, uri, format_hint=format_hint, mode=mode, **kwargs) def _get_writer( self, sdf: ps.DataFrame, partition_spec: PartitionSpec diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 4a3d917b..fa527b14 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -11,7 +11,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast, col, lit, row_number from pyspark.sql.window import Window -from triad import FileSystem, IndexedOrderedDict, ParamDict, Schema, SerializableRLock +from triad import IndexedOrderedDict, ParamDict, Schema, SerializableRLock from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.hash import to_uuid from triad.utils.iter import EmptyAwareIterable @@ -360,13 +360,12 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non cf.update(ParamDict(conf)) super().__init__(cf) self._lock = SerializableRLock() - self._fs = FileSystem() self._log = logging.getLogger() self._broadcast_func = RunOnce( self._broadcast, lambda *args, **kwargs: id(args[0]) ) self._persist_func = RunOnce(self._persist, lambda *args, **kwargs: id(args[0])) - self._io = SparkIO(self.spark_session, self.fs) + self._io = SparkIO(self.spark_session) self._registered_dfs: Dict[str, SparkDataFrame] = {} def __repr__(self) -> str: @@ -395,10 +394,6 @@ def is_distributed(self) -> bool: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - def create_default_sql_engine(self) -> SQLEngine: return SparkSQLEngine(self) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index aa3af323..53b70bdc 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -12,11 +12,12 @@ from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional from unittest import TestCase from uuid import uuid4 - +from triad.utils.io import write_text, join import numpy as np import pandas as pd import pyarrow as pa import pytest +from fsspec.implementations.local import LocalFileSystem from pytest import raises from triad import SerializableRLock @@ -28,7 +29,6 @@ DataFrame, DataFrames, ExecutionEngine, - FileSystem, FugueWorkflow, LocalDataFrame, OutputCoTransformer, @@ -65,6 +65,8 @@ FugueWorkflowRuntimeValidationError, ) +_LOCAL_FS = LocalFileSystem(auto_mkdir=True) + class BuiltInTests(object): """Workflow level general test suite. It is a more general end to end @@ -633,9 +635,8 @@ def test_out_transform(self): # noqa: C901 tmpdir = str(self.tmpdir) def incr(): - fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) - fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.txt").count().files + write_text(join(tmpdir, str(uuid4()) + ".txt"), "") + return len(_LOCAL_FS.glob(join(tmpdir, "*.txt"))) def t1(df: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: for row in df: @@ -717,9 +718,8 @@ def test_out_cotransform(self): # noqa: C901 tmpdir = str(self.tmpdir) def incr(): - fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) - fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.tx" "t").count().files + write_text(join(tmpdir, str(uuid4()) + ".txt"), "") + return len(_LOCAL_FS.glob(join(tmpdir, "*.txt"))) def t1( df: Iterable[Dict[str, Any]], df2: pd.DataFrame @@ -1348,7 +1348,7 @@ def test_io(self): b.partition(num=3).save(path, fmt="parquet", single=True) b.save(path2, header=True) dag.run(self.engine) - assert FileSystem().isfile(path) + assert _LOCAL_FS.isfile(path) with FugueWorkflow() as dag: a = dag.load(path, fmt="parquet", columns=["a", "c"]) a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) @@ -1359,9 +1359,9 @@ def test_io(self): b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(by="c").save(path3, fmt="parquet", single=False) dag.run(self.engine) - assert FileSystem().isdir(path3) - assert FileSystem().isdir(os.path.join(path3, "c=6")) - assert FileSystem().isdir(os.path.join(path3, "c=2")) + assert _LOCAL_FS.isdir(path3) + assert _LOCAL_FS.isdir(os.path.join(path3, "c=6")) + assert _LOCAL_FS.isdir(os.path.join(path3, "c=2")) # TODO: in test below, once issue #288 is fixed, use dag.load # instead of pd.read_parquet pdf = pd.read_parquet(path3).sort_values("a").reset_index(drop=True) diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index bf3c6148..fb428ed9 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -15,8 +15,8 @@ import pandas as pd import pytest from pytest import raises -from triad.collections.fs import FileSystem from triad.exceptions import InvalidOperationError +from triad.utils.io import isfile, makedirs, touch import fugue.api as fa import fugue.column.functions as ff @@ -62,7 +62,6 @@ def make_engine(self) -> ExecutionEngine: # pragma: no cover def test_init(self): print(self.engine) assert self.engine.log is not None - assert self.engine.fs is not None assert copy.copy(self.engine) is self.engine assert copy.deepcopy(self.engine) is self.engine @@ -989,13 +988,12 @@ def init_tmpdir(self, tmpdir): self.tmpdir = tmpdir def test_save_single_and_load_parquet(self): - e = self.engine b = ArrayDataFrame([[6, 1], [2, 7]], "c:int,a:long") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="parquet", force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load(path, format_hint="parquet", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2]], "a:long,c:int", throw=True) @@ -1019,7 +1017,7 @@ def test_load_parquet_folder(self): path = os.path.join(self.tmpdir, "a", "b") fa.save(a, os.path.join(path, "a.parquet"), engine=native) fa.save(b, os.path.join(path, "b.parquet"), engine=native) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load(path, format_hint="parquet", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2], [8, 4]], "a:long,c:int", throw=True) @@ -1038,13 +1036,12 @@ def test_load_parquet_files(self): df_eq(c, [[1, 6], [7, 2], [8, 4]], "a:long,c:int", throw=True) def test_save_single_and_load_csv(self): - e = self.engine b = ArrayDataFrame([[6.1, 1.1], [2.1, 7.1]], "c:double,a:double") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="csv", header=True, force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load( path, format_hint="csv", header=True, infer_schema=False, as_fugue=True ) @@ -1099,13 +1096,12 @@ def test_save_single_and_load_csv(self): df_eq(c, [["1.1", "60.1"], ["7.1", "20.1"]], "a:str,c:str", throw=True) def test_save_single_and_load_csv_no_header(self): - e = self.engine b = ArrayDataFrame([[6.1, 1.1], [2.1, 7.1]], "c:double,a:double") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="csv", header=False, force_single=True) - assert e.fs.isfile(path) + assert isfile(path) with raises(ValueError): c = fa.load( @@ -1190,7 +1186,7 @@ def test_load_csv_folder(self): header=True, engine=native, ) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load( path, format_hint="csv", @@ -1204,13 +1200,12 @@ def test_load_csv_folder(self): ) def test_save_single_and_load_json(self): - e = self.engine b = ArrayDataFrame([[6, 1], [2, 7]], "c:int,a:long") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="json", force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load(path, format_hint="json", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2]], "a:long,c:long", throw=True) @@ -1241,7 +1236,7 @@ def test_load_json_folder(self): path = os.path.join(self.tmpdir, "a", "b") fa.save(a, os.path.join(path, "a.json"), format_hint="json", engine=native) fa.save(b, os.path.join(path, "b.json"), format_hint="json", engine=native) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load(path, format_hint="json", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2], [8, 4], [4, 3]], "a:long,c:long", throw=True) diff --git a/tests/fugue/dataframe/test_utils.py b/tests/fugue/dataframe/test_utils.py index a23f32e6..00c36d1e 100644 --- a/tests/fugue/dataframe/test_utils.py +++ b/tests/fugue/dataframe/test_utils.py @@ -5,7 +5,7 @@ import pandas as pd import pyarrow as pa from pytest import raises -from triad import FileSystem, Schema +from triad import Schema from triad.collections.schema import SchemaError from triad.exceptions import InvalidOperationError, NoneArgumentError @@ -112,7 +112,6 @@ def assert_eq(df, df_expected=None, raw=False): else: df_eq(df_expected, df_actual, throw=True) - fs = FileSystem() assert deserialize_df(serialize_df(None)) is None assert_eq(ArrayDataFrame([], "a:int,b:int")) assert_eq(ArrayDataFrame([[None, None]], "a:int,b:int")) @@ -136,8 +135,7 @@ def assert_eq(df, df_expected=None, raw=False): path = os.path.join(tmpdir, "1.pkl") df = ArrayDataFrame([[None, None]], "a:int,b:int") - s = serialize_df(df, 0, path, fs) - df_eq(df, deserialize_df(s, fs), throw=True) + s = serialize_df(df, 0, path) df_eq(df, deserialize_df(s), throw=True) s = serialize_df(df, 0, path) diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index e8d157dd..aa9883ed 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -1,13 +1,12 @@ import os +from pytest import raises +from triad.utils.io import makedirs, read_text, touch + +from fugue._utils.io import _FORMAT_MAP, FileParser, load_df, save_df from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import _df_eq as df_eq -from fugue._utils.io import FileParser, load_df, save_df, _FORMAT_MAP -from fugue.exceptions import FugueDataFrameOperationError -from pytest import raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError def test_file_parser(): @@ -132,14 +131,13 @@ def test_parquet_io(tmpdir): raises(Exception, lambda: load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() for name in ["folder.parquet", "folder"]: folder = os.path.join(tmpdir, name) - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) save_df(df1, f1) save_df(df1, f2) @@ -178,12 +176,11 @@ def test_parquet_io(tmpdir): def test_csv_io(tmpdir): - fs = FileSystem() df1 = PandasDataFrame([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.csv") # without header save_df(df1, path) - assert fs.readtext(path).startswith("1,2,3") + assert read_text(path).startswith("1,2,3") raises(ValueError, lambda: load_df(path, header=False)) actual = load_df(path, columns=["a", "b", "c"], header=False, infer_schema=True) assert [[1, 2, 3]] == actual.as_array() @@ -193,7 +190,7 @@ def test_csv_io(tmpdir): assert actual.schema == "a:double,b:str,c:str" # with header save_df(df1, path, header=True) - assert fs.readtext(path).startswith("a,b,c") + assert read_text(path).startswith("a,b,c") actual = load_df(path, header=True) assert [["1", "2", "3"]] == actual.as_array() actual = load_df(path, header=True, infer_schema=True) @@ -210,7 +207,6 @@ def test_csv_io(tmpdir): def test_json(tmpdir): - fs = FileSystem() df1 = PandasDataFrame([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.json") save_df(df1, path) diff --git a/tests/fugue/workflow/test_workflow_determinism.py b/tests/fugue/workflow/test_workflow_determinism.py index cc3f0483..a2645b32 100644 --- a/tests/fugue/workflow/test_workflow_determinism.py +++ b/tests/fugue/workflow/test_workflow_determinism.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Iterable, List -from fugue import FileSystem, FugueWorkflow, Schema +from fugue import FugueWorkflow, Schema from fugue.execution.native_execution_engine import NativeExecutionEngine from triad import to_uuid diff --git a/tests/fugue_dask/test_io.py b/tests/fugue_dask/test_io.py index 7cca8c78..3e063bbc 100644 --- a/tests/fugue_dask/test_io.py +++ b/tests/fugue_dask/test_io.py @@ -1,8 +1,7 @@ import os from pytest import mark, raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError +from triad.utils.io import makedirs, touch from fugue._utils.io import save_df as pd_save_df from fugue.dataframe.utils import _df_eq as df_eq @@ -30,14 +29,13 @@ def test_parquet_io(tmpdir, fugue_dask_client): raises(Exception, lambda: load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() for name in ["folder.parquet", "folder"]: folder = os.path.join(tmpdir, name) - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) pd_save_df(df1, f1) pd_save_df(df1, f2) diff --git a/tests/fugue_ray/test_execution_engine.py b/tests/fugue_ray/test_execution_engine.py index ba55cdde..9f45f135 100644 --- a/tests/fugue_ray/test_execution_engine.py +++ b/tests/fugue_ray/test_execution_engine.py @@ -4,7 +4,7 @@ import pandas as pd import pytest import ray.data as rd -from triad import FileSystem +from triad.utils.io import isfile import fugue.api as fa from fugue import ArrayDataFrame, DataFrame, FugueWorkflow, transform @@ -189,7 +189,7 @@ def make_engine(self): connection=self._con, ) return e - + def test_datetime_in_workflow(self): pass @@ -223,7 +223,7 @@ def test_io(self): b.partition(num=3).save(path, fmt="parquet", single=True) b.save(path2, header=True) dag.run(self.engine) - assert FileSystem().isfile(path) + assert isfile(path) with FugueWorkflow() as dag: a = dag.load(path, fmt="parquet", columns=["a", "c"]) a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) @@ -236,9 +236,9 @@ def test_io(self): # with FugueWorkflow() as dag: # b = dag.df([[6, 1], [2, 7]], "c:int,a:long") # b.partition(by="c").save(path3, fmt="parquet", single=False) - # assert FileSystem().isdir(path3) - # assert FileSystem().isdir(os.path.join(path3, "c=6")) - # assert FileSystem().isdir(os.path.join(path3, "c=2")) + # assert isdir(path3) + # assert isdir(os.path.join(path3, "c=6")) + # assert isdir(os.path.join(path3, "c=2")) # # TODO: in test below, once issue #288 is fixed, use dag.load # # instead of pd.read_parquet # pd.testing.assert_frame_equal( diff --git a/tests/fugue_spark/utils/test_io.py b/tests/fugue_spark/utils/test_io.py index 2c6356e0..c0532139 100644 --- a/tests/fugue_spark/utils/test_io.py +++ b/tests/fugue_spark/utils/test_io.py @@ -1,20 +1,20 @@ import os +from pyspark.sql import SparkSession +from pyspark.sql.utils import AnalysisException +from pytest import raises +from triad.utils.io import isfile, makedirs, touch + from fugue.collections.partition import PartitionSpec from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import _df_eq as df_eq -from fugue_spark.dataframe import SparkDataFrame from fugue_spark._utils.convert import to_schema, to_spark_schema from fugue_spark._utils.io import SparkIO -from pyspark.sql import SparkSession -from pyspark.sql.utils import AnalysisException -from pytest import raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError +from fugue_spark.dataframe import SparkDataFrame def test_parquet_io(tmpdir, spark_session): - si = SparkIO(spark_session, FileSystem()) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") df2 = _df([[[1, 2]]], "a:[int]") # {a:int} will become {a:long} because pyarrow lib has issue @@ -33,16 +33,15 @@ def test_parquet_io(tmpdir, spark_session): raises(Exception, lambda: si.load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() folder = os.path.join(tmpdir, "folder") - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) si.save_df(df1, f1, force_single=True) si.save_df(df1, f2, force_single=True) - assert fs.isfile(f1) + assert isfile(f1) actual = si.load_df(folder, "parquet") df_eq(actual, [["1", 2, 3], ["1", 2, 3]], "a:str,b:int,c:long") @@ -61,8 +60,7 @@ def test_parquet_io(tmpdir, spark_session): def test_csv_io(tmpdir, spark_session): - fs = FileSystem() - si = SparkIO(spark_session, fs) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.csv") # without header @@ -91,8 +89,7 @@ def test_csv_io(tmpdir, spark_session): def test_json_io(tmpdir, spark_session): - fs = FileSystem() - si = SparkIO(spark_session, fs) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.json") si.save_df(df1, path) @@ -106,7 +103,7 @@ def test_json_io(tmpdir, spark_session): def test_save_with_partition(tmpdir, spark_session): - si = SparkIO(spark_session, FileSystem()) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.parquet") si.save_df(df1, path, partition_spec=PartitionSpec(num=2)) From e234940c281b98059ec9779272b3ce8bf8aef18d Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 30 Oct 2023 05:56:44 +0000 Subject: [PATCH 02/11] update --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a4d81db0..3e29569b 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def get_version() -> str: keywords="distributed spark dask ray sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad==0.9.2.dev5", + "triad==0.9.2.dev6", "adagio>=0.2.4", # sql dependencies "qpd>=0.4.4", From 97874a8b3c3c90fe65fde86e38be0735ff33acff Mon Sep 17 00:00:00 2001 From: Han Wang Date: Mon, 30 Oct 2023 07:01:13 +0000 Subject: [PATCH 03/11] update --- fugue/_utils/io.py | 32 +++++++++--------- tests/fugue/utils/test_io.py | 65 +++++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index 61db1912..47206fd4 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -1,10 +1,10 @@ import os import pathlib from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union -from urllib.parse import urlparse import pandas as pd from fsspec import AbstractFileSystem +from fsspec.core import split_protocol from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw @@ -19,20 +19,22 @@ def __init__(self, path: str, format_hint: Optional[str] = None): last = len(path) has_glob = False self._orig_format_hint = format_hint + p, _ = split_protocol(path) + sep = os.path.sep if p is None else "/" for i in range(len(path)): - if path[i] in ["/", "\\"]: + if path[i] == sep: last = i if path[i] in ["*", "?"]: has_glob = True break if not has_glob: - self._uri = urlparse(path) + self._uri = path self._glob_pattern = "" - self._path = self._uri.path + self._raw_path = path else: - self._uri = urlparse(path[:last]) + self._uri = path[:last] self._glob_pattern = path[last + 1 :] - self._path = join(self._uri.path, self._glob_pattern) + self._raw_path = path if format_hint is None or format_hint == "": for k, v in _FORMAT_MAP.items(): @@ -48,7 +50,7 @@ def __init__(self, path: str, format_hint: Optional[str] = None): self._format = format_hint def assert_no_glob(self) -> "FileParser": - assert_or_throw(self.glob_pattern == "", f"{self.path} has glob pattern") + assert_or_throw(self.glob_pattern == "", f"{self.raw_path} has glob pattern") return self def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser": @@ -62,9 +64,9 @@ def find_all(self, fs: Optional[AbstractFileSystem] = None) -> List[str]: return [self.uri] else: if fs is None: - return self.fs.glob(self._path) + return self.fs.glob(self.raw_path) else: - return fs.glob(self._path) + return fs.glob(self.raw_path) @property def fs(self) -> AbstractFileSystem: @@ -77,7 +79,7 @@ def glob_pattern(self) -> str: @property def uri(self) -> str: - return self._uri.geturl() + return self._uri @property def uri_with_glob(self) -> str: @@ -91,16 +93,12 @@ def parent(self) -> str: return dn if dn != "" else "." @property - def scheme(self) -> str: - return self._uri.scheme - - @property - def path(self) -> str: - return self._path + def raw_path(self) -> str: + return self._raw_path @property def suffix(self) -> str: - return "".join(pathlib.Path(self.path.lower()).suffixes) + return "".join(pathlib.Path(self.raw_path.lower()).suffixes) @property def file_format(self) -> str: diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index aa9883ed..f2a68700 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -1,5 +1,7 @@ import os +import sys +import pytest from pytest import raises from triad.utils.io import makedirs, read_text, touch @@ -9,55 +11,67 @@ from fugue.dataframe.utils import _df_eq as df_eq +@pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") +def test_file_parser_linux(): + f = FileParser("/a/b/c.parquet") + assert "/a/b/c.parquet" == f.uri + assert "/a/b/c.parquet" == f.uri_with_glob + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert "" == f.glob_pattern + assert "/a/b" == f.parent + + +@pytest.mark.skipif( + not sys.platform.startswith("win"), reason="a test only for windows" +) +def test_file_parser_win(): + f = FileParser("c:\\a\\c.parquet") + assert "c:\\a\\c.parquet" == f.uri + assert "c:\\a\\c.parquet" == f.uri_with_glob + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert "" == f.glob_pattern + assert "c:\\a" == f.parent + + f = FileParser("c:\\a\\*.parquet") + assert "c:\\a" == f.uri + assert "c:\\a\\*.parquet" == f.uri_with_glob + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert "*.parquet" == f.glob_pattern + assert "c:\\a" == f.parent + + def test_file_parser(): f = FileParser("c.parquet") assert "c.parquet" == f.uri assert "c.parquet" == f.uri_with_glob - assert "" == f.scheme - assert "c.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "" == f.glob_pattern assert "." == f.parent - f = FileParser("/a/b/c.parquet") - assert "/a/b/c.parquet" == f.uri - assert "/a/b/c.parquet" == f.uri_with_glob - assert "" == f.scheme - assert "/a/b/c.parquet" == f.path - assert ".parquet" == f.suffix - assert "parquet" == f.file_format - assert "" == f.glob_pattern - assert "/a/b" == f.parent - for k, v in _FORMAT_MAP.items(): f = FileParser(f"s3://a/b/c{k}") assert f"s3://a/b/c{k}" == f.uri - assert "s3" == f.scheme - assert f"/b/c{k}" == f.path assert k == f.suffix assert v == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.test.parquet") assert "s3://a/b/c.test.parquet" == f.uri - assert "s3" == f.scheme - assert "/b/c.test.parquet" == f.path assert ".test.parquet" == f.suffix assert "parquet" == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.ppp.gz", "csv") assert "s3://a/b/c.ppp.gz" == f.uri - assert "s3" == f.scheme - assert "/b/c.ppp.gz" == f.path assert ".ppp.gz" == f.suffix assert "csv" == f.file_format f = FileParser("s3://a/b/c", "csv") assert "s3://a/b/c" == f.uri - assert "s3" == f.scheme - assert "/b/c" == f.path assert "" == f.suffix assert "csv" == f.file_format @@ -66,11 +80,10 @@ def test_file_parser(): raises(NotImplementedError, lambda: FileParser("s3://a/b/c")) -def test_file_parser_glob(): +@pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") +def test_file_parser_glob_linux(): f = FileParser("/a/b/*.parquet") assert "/a/b" == f.uri - assert "" == f.scheme - assert "/a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "*.parquet" == f.glob_pattern @@ -78,17 +91,15 @@ def test_file_parser_glob(): f = FileParser("/a/b/*123.parquet") assert "/a/b" == f.uri - assert "" == f.scheme - assert "/a/b/*123.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "*123.parquet" == f.glob_pattern assert "/a/b/*123.parquet" == f.uri_with_glob + +def test_file_parser_glob(): f = FileParser("s3://a/b/*.parquet") assert "s3://a/b" == f.uri - assert "s3" == f.scheme - assert "/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "*.parquet" == f.glob_pattern From 1988b87937517989e4b57ee8131a378f1e19e6f4 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 31 Oct 2023 06:35:31 +0000 Subject: [PATCH 04/11] update --- fugue/_utils/io.py | 20 +++----------------- setup.py | 2 +- tests/fugue/utils/test_io.py | 2 +- 3 files changed, 5 insertions(+), 19 deletions(-) diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index 47206fd4..cfd617df 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -8,7 +8,7 @@ from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw -from triad.utils.io import join, url_to_fs +from triad.utils.io import join, url_to_fs, glob as get_glob from triad.utils.pandas_like import PD_UTILS from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame @@ -59,20 +59,6 @@ def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser uri = join(uri, glob) return FileParser(uri, format_hint or self._orig_format_hint) - def find_all(self, fs: Optional[AbstractFileSystem] = None) -> List[str]: - if self.glob_pattern == "": - return [self.uri] - else: - if fs is None: - return self.fs.glob(self.raw_path) - else: - return fs.glob(self.raw_path) - - @property - def fs(self) -> AbstractFileSystem: - _fs, _ = url_to_fs(self.uri) - return _fs - @property def glob_pattern(self) -> str: return self._glob_pattern @@ -142,7 +128,7 @@ def save_df( assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: fs.rm(uri, recursive=True) - except Exception: + except Exception: # pragma: no cover pass _FORMAT_SAVE[p.file_format](df, p, **kwargs) @@ -152,7 +138,7 @@ def _get_single_files( ) -> Iterable[FileParser]: for f in fp: if f.glob_pattern != "": - files = [FileParser(x) for x in f.find_all(fs)] + files = [FileParser(x) for x in get_glob(f.raw_path)] yield from _get_single_files(files, fs) else: yield f diff --git a/setup.py b/setup.py index 3e29569b..6bea5afc 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def get_version() -> str: keywords="distributed spark dask ray sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad==0.9.2.dev6", + "triad==0.9.2.dev7", "adagio>=0.2.4", # sql dependencies "qpd>=0.4.4", diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index f2a68700..70805810 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -40,7 +40,7 @@ def test_file_parser_win(): assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "*.parquet" == f.glob_pattern - assert "c:\\a" == f.parent + assert "c:\\" == f.parent def test_file_parser(): From db7617543c6ffdd28e424085eced238bfdead056 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Wed, 1 Nov 2023 05:45:50 +0000 Subject: [PATCH 05/11] update --- fugue/_utils/io.py | 31 +++++++++++++++++++---------- fugue_test/execution_suite.py | 2 +- fugue_test/plugins/misc/__init__.py | 2 ++ fugue_test/plugins/misc/fixtures.py | 18 +++++++++++++++++ requirements.txt | 2 ++ setup.py | 1 + tests/fugue/utils/test_io.py | 12 +++++++++-- 7 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 fugue_test/plugins/misc/__init__.py create mode 100644 fugue_test/plugins/misc/fixtures.py diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index cfd617df..56ce1b5b 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -8,7 +8,7 @@ from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw -from triad.utils.io import join, url_to_fs, glob as get_glob +from triad.utils.io import join, url_to_fs from triad.utils.pandas_like import PD_UTILS from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame @@ -75,8 +75,8 @@ def uri_with_glob(self) -> str: @property def parent(self) -> str: - dn = os.path.dirname(self.uri) - return dn if dn != "" else "." + fs, _path = url_to_fs(self.uri) + return fs.unstrip_protocol(fs._parent(_path)) @property def raw_path(self) -> str: @@ -138,7 +138,8 @@ def _get_single_files( ) -> Iterable[FileParser]: for f in fp: if f.glob_pattern != "": - files = [FileParser(x) for x in get_glob(f.raw_path)] + fs, _ = url_to_fs(f.uri) + files = [FileParser(fs.unstrip_protocol(x)) for x in fs.glob(f.raw_path)] yield from _get_single_files(files, fs) else: yield f @@ -179,10 +180,12 @@ def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame: def load_dir() -> pd.DataFrame: - fs, _ = url_to_fs(path) - return pd.concat( - [pd.read_csv(x, **kwargs) for x in fs.glob(join(path, "*.csv"))] - ) + fs, _path = url_to_fs(path) + dfs: List[pd.DataFrame] = [] + for _p in fs.glob(fs.sep.join([_path, "*.csv"])): # type: ignore + with fs.open(_p, "r") as f: + dfs.append(pd.read_csv(f, **kwargs)) + return pd.concat(dfs) try: return pd.read_csv(path, **kwargs) @@ -241,11 +244,19 @@ def _save_json(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame: kw = {"orient": "records", "lines": True, **kwargs} + + def load_dir() -> pd.DataFrame: + fs, _path = url_to_fs(path) + dfs: List[pd.DataFrame] = [] + for _p in fs.glob(fs.sep.join([_path, "*.json"])): # type: ignore + with fs.open(_p, "r") as f: + dfs.append(pd.read_json(f, **kw)) + return pd.concat(dfs) + try: return pd.read_json(path, **kw) except (IsADirectoryError, PermissionError): - fs, _ = url_to_fs(path) - return pd.concat([pd.read_json(x, **kw) for x in fs.glob(join(path, "*.json"))]) + return load_dir() def _load_json( diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index fb428ed9..ec1c8aa6 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -984,7 +984,7 @@ def on_init(partition_no, dfs): df_eq(res, [[1, "z1"]], "a:int,v:str", throw=True) @pytest.fixture(autouse=True) - def init_tmpdir(self, tmpdir): + def init_tmpdir(self, tmpdir, tmp_mem_dir): self.tmpdir = tmpdir def test_save_single_and_load_parquet(self): diff --git a/fugue_test/plugins/misc/__init__.py b/fugue_test/plugins/misc/__init__.py new file mode 100644 index 00000000..9407d26a --- /dev/null +++ b/fugue_test/plugins/misc/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .fixtures import tmp_mem_dir diff --git a/fugue_test/plugins/misc/fixtures.py b/fugue_test/plugins/misc/fixtures.py new file mode 100644 index 00000000..6ca80012 --- /dev/null +++ b/fugue_test/plugins/misc/fixtures.py @@ -0,0 +1,18 @@ +import uuid + +import pytest +from triad.utils.io import makedirs, rm + + +@pytest.fixture +def tmp_mem_dir(): + uuid_str = str(uuid.uuid4())[:5] + path = "memory://test_" + uuid_str + makedirs(path) + try: + yield path + finally: + try: + rm(path, recursive=True) + except Exception: # pragma: no cover + pass diff --git a/requirements.txt b/requirements.txt index ff04ff56..b3eaef95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,8 @@ seaborn notebook<7 jupyter_contrib_nbextensions +s3fs + pyspark[connect] duckdb-engine>=0.6.4 sqlalchemy==2.0.10 # 2.0.11 has a bug diff --git a/setup.py b/setup.py index 6bea5afc..09cb7f2e 100644 --- a/setup.py +++ b/setup.py @@ -109,6 +109,7 @@ def get_version() -> str: "fugue_test_dask = fugue_test.plugins.dask[dask]", "fugue_test_ray = fugue_test.plugins.ray[ray]", "fugue_test_duckdb = fugue_test.plugins.duckdb[duckdb]", + "fugue_test_misc = fugue_test.plugins.misc", ], }, ) diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index 70805810..ac171141 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -19,7 +19,7 @@ def test_file_parser_linux(): assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "" == f.glob_pattern - assert "/a/b" == f.parent + assert "file:///a/b" == f.parent @pytest.mark.skipif( @@ -50,7 +50,15 @@ def test_file_parser(): assert ".parquet" == f.suffix assert "parquet" == f.file_format assert "" == f.glob_pattern - assert "." == f.parent + #assert "." == f.parent + + f = FileParser("memory:///c.parquet") + assert "memory:///c.parquet" == f.uri + assert "memory:///c.parquet" == f.uri_with_glob + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert "" == f.glob_pattern + assert "memory:///" == f.parent for k, v in _FORMAT_MAP.items(): f = FileParser(f"s3://a/b/c{k}") From f9c8d28efe09d9ebf05ed6e1fc7e53512dee2534 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 2 Nov 2023 06:46:50 +0000 Subject: [PATCH 06/11] update --- .devcontainer/devcontainer.json | 6 +- fugue/_utils/io.py | 120 +++++++++++++++----------------- tests/fugue/utils/test_io.py | 75 +++++++++----------- 3 files changed, 93 insertions(+), 108 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 06732abf..55677d0a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,6 +1,5 @@ { "name": "Fugue Development Environment", - "_image": "fugueproject/devenv:0.7.7", "image": "mcr.microsoft.com/vscode/devcontainers/python:3.10", "customizations": { "vscode": { @@ -8,6 +7,7 @@ "terminal.integrated.shell.linux": "/bin/bash", "python.pythonPath": "/usr/local/bin/python", "python.defaultInterpreterPath": "/usr/local/bin/python", + "editor.defaultFormatter": "ms-python.black-formatter", "isort.interpreter": [ "/usr/local/bin/python" ], @@ -16,6 +16,9 @@ ], "pylint.interpreter": [ "/usr/local/bin/python" + ], + "black-formatter.interpreter": [ + "/usr/local/bin/python" ] }, "extensions": [ @@ -24,6 +27,7 @@ "ms-python.flake8", "ms-python.pylint", "ms-python.mypy", + "ms-python.black-formatter", "GitHub.copilot", "njpwerner.autodocstring" ] diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index 56ce1b5b..4f8bcf1e 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -1,10 +1,8 @@ -import os import pathlib from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd from fsspec import AbstractFileSystem -from fsspec.core import split_protocol from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw @@ -16,25 +14,11 @@ class FileParser(object): def __init__(self, path: str, format_hint: Optional[str] = None): - last = len(path) - has_glob = False self._orig_format_hint = format_hint - p, _ = split_protocol(path) - sep = os.path.sep if p is None else "/" - for i in range(len(path)): - if path[i] == sep: - last = i - if path[i] in ["*", "?"]: - has_glob = True - break - if not has_glob: - self._uri = path - self._glob_pattern = "" - self._raw_path = path - else: - self._uri = path[:last] - self._glob_pattern = path[last + 1 :] - self._raw_path = path + self._has_glob = "*" in path or "?" in path + self._raw_path = path + self._fs, self._fs_path = url_to_fs(path) + self._path = self._fs.unstrip_protocol(self._fs_path) if format_hint is None or format_hint == "": for k, v in _FORMAT_MAP.items(): @@ -50,33 +34,27 @@ def __init__(self, path: str, format_hint: Optional[str] = None): self._format = format_hint def assert_no_glob(self) -> "FileParser": - assert_or_throw(self.glob_pattern == "", f"{self.raw_path} has glob pattern") + assert_or_throw(not self.has_glob, f"{self.raw_path} has glob pattern") return self - def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser": - uri = self.uri - if glob != "": - uri = join(uri, glob) - return FileParser(uri, format_hint or self._orig_format_hint) - @property - def glob_pattern(self) -> str: - return self._glob_pattern + def has_glob(self): + return self._has_glob - @property - def uri(self) -> str: - return self._uri + def join(self, path: str, format_hint: Optional[str] = None) -> "FileParser": + if not self.has_glob: + _path = join(self.path, path) + else: + _path = join(self.parent, path) + return FileParser(_path, format_hint or self._orig_format_hint) @property - def uri_with_glob(self) -> str: - if self.glob_pattern == "": - return self.uri - return join(self.uri, self.glob_pattern) + def parent(self) -> str: + return self._fs.unstrip_protocol(self._fs._parent(self._fs_path)) @property - def parent(self) -> str: - fs, _path = url_to_fs(self.uri) - return fs.unstrip_protocol(fs._parent(_path)) + def path(self) -> str: + return self._path @property def raw_path(self) -> str: @@ -90,6 +68,17 @@ def suffix(self) -> str: def file_format(self) -> str: return self._format + def find_all(self) -> Iterable["FileParser"]: + if self.has_glob: + for x in self._fs.glob(self._fs_path): + yield FileParser(self._fs.unstrip_protocol(x)) + else: + yield self + + def open(self, *args: Any, **kwargs: Any) -> Any: + self.assert_no_glob() + return self._fs.open(self._fs_path, *args, **kwargs) + def load_df( uri: Union[str, List[str]], @@ -137,19 +126,14 @@ def _get_single_files( fp: Iterable[FileParser], fs: Optional[AbstractFileSystem] ) -> Iterable[FileParser]: for f in fp: - if f.glob_pattern != "": - fs, _ = url_to_fs(f.uri) - files = [FileParser(fs.unstrip_protocol(x)) for x in fs.glob(f.raw_path)] - yield from _get_single_files(files, fs) - else: - yield f + yield from f.find_all() def _save_parquet(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: PD_UTILS.to_parquet_friendly( df.as_pandas(), partition_cols=kwargs.get("partition_cols", []) ).to_parquet( - p.uri, + p.assert_no_glob().path, **{ "engine": "pyarrow", "schema": df.schema.pa_schema, @@ -162,33 +146,36 @@ def _load_parquet( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[pd.DataFrame, Any]: if columns is None: - pdf = pd.read_parquet(p.uri, **{"engine": "pyarrow", **kwargs}) + pdf = pd.read_parquet(p.path, **{"engine": "pyarrow", **kwargs}) return pdf, None if isinstance(columns, list): # column names - pdf = pd.read_parquet(p.uri, columns=columns, **{"engine": "pyarrow", **kwargs}) + pdf = pd.read_parquet( + p.path, columns=columns, **{"engine": "pyarrow", **kwargs} + ) return pdf, None schema = Schema(columns) pdf = pd.read_parquet( - p.uri, columns=schema.names, **{"engine": "pyarrow", **kwargs} + p.path, columns=schema.names, **{"engine": "pyarrow", **kwargs} ) return pdf, schema def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: - df.as_pandas().to_csv(p.uri, **{"index": False, "header": False, **kwargs}) + with p.open("w") as f: + df.as_pandas().to_csv(f, **{"index": False, "header": False, **kwargs}) -def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame: +def _safe_load_csv(p: FileParser, **kwargs: Any) -> pd.DataFrame: def load_dir() -> pd.DataFrame: - fs, _path = url_to_fs(path) dfs: List[pd.DataFrame] = [] - for _p in fs.glob(fs.sep.join([_path, "*.csv"])): # type: ignore - with fs.open(_p, "r") as f: + for _p in p.join("*.csv").find_all(): # type: ignore + with _p.open("r") as f: dfs.append(pd.read_csv(f, **kwargs)) return pd.concat(dfs) try: - return pd.read_csv(path, **kwargs) + with p.open("r") as f: + return pd.read_csv(f, **kwargs) except IsADirectoryError: return load_dir() except pd.errors.ParserError: # pragma: no cover @@ -214,7 +201,7 @@ def _load_csv( # noqa: C901 header = kw["header"] del kw["header"] if str(header) in ["True", "0"]: - pdf = _safe_load_csv(p.uri, **{"index_col": False, "header": 0, **kw}) + pdf = _safe_load_csv(p, **{"index_col": False, "header": 0, **kw}) if columns is None: return pdf, None if isinstance(columns, list): # column names @@ -226,12 +213,14 @@ def _load_csv( # noqa: C901 raise ValueError("columns must be set if without header") if isinstance(columns, list): # column names pdf = _safe_load_csv( - p.uri, **{"index_col": False, "header": None, "names": columns, **kw} + p, + **{"index_col": False, "header": None, "names": columns, **kw}, ) return pdf, None schema = Schema(columns) pdf = _safe_load_csv( - p.uri, **{"index_col": False, "header": None, "names": schema.names, **kw} + p, + **{"index_col": False, "header": None, "names": schema.names, **kw}, ) return pdf, schema else: @@ -239,22 +228,23 @@ def _load_csv( # noqa: C901 def _save_json(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: - df.as_pandas().to_json(p.uri, **{"orient": "records", "lines": True, **kwargs}) + with p.open("w") as f: + df.as_pandas().to_json(f, **{"orient": "records", "lines": True, **kwargs}) -def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame: +def _safe_load_json(p: FileParser, **kwargs: Any) -> pd.DataFrame: kw = {"orient": "records", "lines": True, **kwargs} def load_dir() -> pd.DataFrame: - fs, _path = url_to_fs(path) dfs: List[pd.DataFrame] = [] - for _p in fs.glob(fs.sep.join([_path, "*.json"])): # type: ignore - with fs.open(_p, "r") as f: + for _p in p.join("*.json").find_all(): # type: ignore + with _p.open("r") as f: dfs.append(pd.read_json(f, **kw)) return pd.concat(dfs) try: - return pd.read_json(path, **kw) + with p.open("r") as f: + return pd.read_json(f, **kw) except (IsADirectoryError, PermissionError): return load_dir() @@ -262,7 +252,7 @@ def load_dir() -> pd.DataFrame: def _load_json( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[pd.DataFrame, Any]: - pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True) + pdf = _safe_load_json(p, **kwargs).reset_index(drop=True) if columns is None: return pdf, None if isinstance(columns, list): # column names diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index ac171141..e5e3ff98 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -14,11 +14,10 @@ @pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") def test_file_parser_linux(): f = FileParser("/a/b/c.parquet") - assert "/a/b/c.parquet" == f.uri - assert "/a/b/c.parquet" == f.uri_with_glob + assert "file:///a/b/c.parquet" == f.path + assert not f.has_glob assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern assert "file:///a/b" == f.parent @@ -27,59 +26,54 @@ def test_file_parser_linux(): ) def test_file_parser_win(): f = FileParser("c:\\a\\c.parquet") - assert "c:\\a\\c.parquet" == f.uri - assert "c:\\a\\c.parquet" == f.uri_with_glob + assert "file://c:/a/c.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern - assert "c:\\a" == f.parent + assert not f.has_glob + assert "file://c:/a" == f.parent f = FileParser("c:\\a\\*.parquet") - assert "c:\\a" == f.uri - assert "c:\\a\\*.parquet" == f.uri_with_glob + assert "file://c:/a" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*.parquet" == f.glob_pattern - assert "c:\\" == f.parent + assert f.has_glob + assert "file://c:" == f.parent def test_file_parser(): f = FileParser("c.parquet") - assert "c.parquet" == f.uri - assert "c.parquet" == f.uri_with_glob + assert "c.parquet" == f.raw_path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern - #assert "." == f.parent + # assert "." == f.parent - f = FileParser("memory:///c.parquet") - assert "memory:///c.parquet" == f.uri - assert "memory:///c.parquet" == f.uri_with_glob + f = FileParser("memory://c.parquet") + assert "memory://c.parquet" == f.raw_path + assert "memory:///c.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern assert "memory:///" == f.parent for k, v in _FORMAT_MAP.items(): f = FileParser(f"s3://a/b/c{k}") - assert f"s3://a/b/c{k}" == f.uri + assert f"s3://a/b/c{k}" == f.raw_path assert k == f.suffix assert v == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.test.parquet") - assert "s3://a/b/c.test.parquet" == f.uri + assert "s3://a/b/c.test.parquet" == f.raw_path assert ".test.parquet" == f.suffix assert "parquet" == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.ppp.gz", "csv") - assert "s3://a/b/c.ppp.gz" == f.uri + assert "s3://a/b/c.ppp.gz" == f.raw_path assert ".ppp.gz" == f.suffix assert "csv" == f.file_format f = FileParser("s3://a/b/c", "csv") - assert "s3://a/b/c" == f.uri + assert "s3://a/b/c" == f.raw_path assert "" == f.suffix assert "csv" == f.file_format @@ -91,42 +85,39 @@ def test_file_parser(): @pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") def test_file_parser_glob_linux(): f = FileParser("/a/b/*.parquet") - assert "/a/b" == f.uri + assert "file:///a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*.parquet" == f.glob_pattern - assert "/a/b/*.parquet" == f.uri_with_glob + assert f.has_glob f = FileParser("/a/b/*123.parquet") - assert "/a/b" == f.uri + assert "file:///a/b/*123.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*123.parquet" == f.glob_pattern - assert "/a/b/*123.parquet" == f.uri_with_glob + assert f.has_glob def test_file_parser_glob(): f = FileParser("s3://a/b/*.parquet") - assert "s3://a/b" == f.uri + assert "s3://a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*.parquet" == f.glob_pattern - assert "s3://a/b/*.parquet" == f.uri_with_glob + assert f.has_glob - ff = FileParser("s3://a/b", "parquet").with_glob("*.csv", "csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b", "parquet").join("*.csv", "csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/", "csv").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/", "csv").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/*.parquet").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/*.parquet", "parquet").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet", "parquet").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "parquet" == ff.file_format - ff = FileParser("s3://a/b/*.parquet", "parquet").with_glob("*.csv", "csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet", "parquet").join("*.csv", "csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format From e5f1256ea78144bf1a3a821ed195a37c1122f2ef Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 2 Nov 2023 07:39:04 +0000 Subject: [PATCH 07/11] update --- fugue/_utils/io.py | 11 ++++++++++- fugue_dask/_io.py | 31 ++++++++++++++----------------- fugue_duckdb/_io.py | 18 ++++++++++-------- fugue_ray/_utils/io.py | 4 ++-- fugue_spark/_utils/io.py | 2 +- tests/fugue/utils/test_io.py | 12 ++++++------ 6 files changed, 43 insertions(+), 35 deletions(-) diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index 4f8bcf1e..64d3e8c5 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -1,8 +1,10 @@ +import os import pathlib from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd from fsspec import AbstractFileSystem +from fsspec.implementations.local import LocalFileSystem from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw @@ -18,7 +20,10 @@ def __init__(self, path: str, format_hint: Optional[str] = None): self._has_glob = "*" in path or "?" in path self._raw_path = path self._fs, self._fs_path = url_to_fs(path) - self._path = self._fs.unstrip_protocol(self._fs_path) + if not self.is_local: + self._path = self._fs.unstrip_protocol(self._fs_path) + else: + self._path = os.path.abspath(self._fs._strip_protocol(path)) if format_hint is None or format_hint == "": for k, v in _FORMAT_MAP.items(): @@ -41,6 +46,10 @@ def assert_no_glob(self) -> "FileParser": def has_glob(self): return self._has_glob + @property + def is_local(self): + return isinstance(self._fs, LocalFileSystem) + def join(self, path: str, format_hint: Optional[str] = None) -> "FileParser": if not self.has_glob: _path = join(self.path, path) diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index 812be54a..7198e87b 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -1,13 +1,13 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union -import fsspec import pandas as pd from dask import dataframe as dd from fsspec import AbstractFileSystem from triad.collections.dict import ParamDict from triad.collections.schema import Schema + from triad.utils.assertion import assert_or_throw -from triad.utils.io import join, url_to_fs +from triad.utils.io import join, url_to_fs, makedirs from fugue._utils.io import FileParser, _get_single_files from fugue_dask.dataframe import DaskDataFrame @@ -64,7 +64,7 @@ def _save_parquet(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: "write_index": False, **kwargs, } - DASK_UTILS.to_parquet_friendly(df.native).to_parquet(p.uri, **params) + DASK_UTILS.to_parquet_friendly(df.native).to_parquet(p.path, **params) def _load_parquet( @@ -77,27 +77,26 @@ def _load_parquet( if pd.__version__ >= "1.5": dtype_backend = "pyarrow" if columns is None: - pdf = dd.read_parquet(p.uri, dtype_backend=dtype_backend, **params) + pdf = dd.read_parquet(p.path, dtype_backend=dtype_backend, **params) schema = Schema(pdf.head(1)) return pdf, schema if isinstance(columns, list): # column names pdf = dd.read_parquet( - p.uri, columns=columns, dtype_backend=dtype_backend, **params + p.path, columns=columns, dtype_backend=dtype_backend, **params ) schema = Schema(pdf.head(1)) return pdf, schema schema = Schema(columns) pdf = dd.read_parquet( - p.uri, columns=schema.names, dtype_backend=dtype_backend, **params + p.path, columns=schema.names, dtype_backend=dtype_backend, **params ) return pdf, schema def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: - fs, path = fsspec.core.url_to_fs(p.uri) - fs.makedirs(path, exist_ok=True) + makedirs(p.path, exist_ok=True) df.native.to_csv( - join(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs} + p.join("*.csv").path, **{"index": False, "header": False, **kwargs} ) @@ -124,7 +123,7 @@ def _load_csv( # noqa: C901 header = kw["header"] del kw["header"] if str(header) in ["True", "0"]: - pdf = _safe_load_csv(p.uri, **{"header": 0, **kw}) + pdf = _safe_load_csv(p.path, **{"header": 0, **kw}) if columns is None: return pdf, None if isinstance(columns, list): # column names @@ -135,19 +134,18 @@ def _load_csv( # noqa: C901 if columns is None: raise ValueError("columns must be set if without header") if isinstance(columns, list): # column names - pdf = _safe_load_csv(p.uri, **{"header": None, "names": columns, **kw}) + pdf = _safe_load_csv(p.path, **{"header": None, "names": columns, **kw}) return pdf, None schema = Schema(columns) - pdf = _safe_load_csv(p.uri, **{"header": None, "names": schema.names, **kw}) + pdf = _safe_load_csv(p.path, **{"header": None, "names": schema.names, **kw}) return pdf, schema else: raise NotImplementedError(f"{header} is not supported") def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: - fs, path = fsspec.core.url_to_fs(p.uri) - fs.makedirs(path, exist_ok=True) - df.native.to_json(join(p.uri, "*.json"), **kwargs) + makedirs(p.path, exist_ok=True) + df.native.to_json(p.join("*.json").path, **kwargs) def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame: @@ -155,14 +153,13 @@ def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame: return dd.read_json(path, **kwargs) except (IsADirectoryError, PermissionError): x = dd.read_json(join(path, "*.json"), **kwargs) - print(x.compute()) return x def _load_json( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[dd.DataFrame, Any]: - pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True) + pdf = _safe_load_json(p.path, **kwargs).reset_index(drop=True) if columns is None: return pdf, None if isinstance(columns, list): # column names diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 1c28e685..437f241c 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -18,10 +18,10 @@ from fugue_duckdb.dataframe import DuckDataFrame -def _get_single_files(fp: Iterable[FileParser], fmt: str) -> Iterable[FileParser]: +def _get_files(fp: Iterable[FileParser], fmt: str) -> Iterable[FileParser]: for f in fp: - if f.glob_pattern == "" and isdir(f.uri): - yield f.with_glob("*." + fmt, fmt) + if not f.has_glob and isdir(f.path): + yield from f.join("*." + fmt, fmt).find_all() else: yield f @@ -48,7 +48,7 @@ def load_df( if fp[0].file_format not in self._format_load: return load_df(uri, format_hint=format_hint, columns=columns, **kwargs) dfs: List[DuckDataFrame] = [] - for f in _get_single_files(fp, fp[0].file_format): + for f in _get_files(fp, fp[0].file_format): df = self._format_load[f.file_format](f, columns, **kwargs) dfs.append(df) rel = dfs[0].native @@ -86,6 +86,7 @@ def save_df( self._format_save[p.file_format](df, p, **kwargs) def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): + p.assert_no_glob() dn = TempTableName() df.native.create_view(dn.key) kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) @@ -94,7 +95,7 @@ def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): for k, v in kw.items(): params.append(f"{k.upper()} " + encode_value_to_expr(v)) pm = ", ".join(params) - query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)} WITH ({pm})" + query = f"COPY {dn.key} TO {encode_value_to_expr(p.path)} WITH ({pm})" self._con.execute(query) def _load_csv( # noqa: C901 @@ -108,7 +109,7 @@ def _load_csv( # noqa: C901 ValueError("when csv has no header, columns must be specified"), ) kw.pop("auto_detect", None) - params: List[str] = [encode_value_to_expr(p.uri_with_glob)] + params: List[str] = [encode_value_to_expr(p.path)] kw["header"] = 1 if header else 0 kw["auto_detect"] = 1 if infer_schema else 0 if infer_schema: @@ -171,6 +172,7 @@ def _load_csv( # noqa: C901 return DuckDataFrame(self._con.from_query(query)) def _save_parquet(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): + p.assert_no_glob() dn = TempTableName() df.native.create_view(dn.key) kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) @@ -179,7 +181,7 @@ def _save_parquet(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): for k, v in kw.items(): params.append(f"{k.upper()} " + encode_value_to_expr(v)) pm = ", ".join(params) - query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)}" + query = f"COPY {dn.key} TO {encode_value_to_expr(p.path)}" if len(params) > 0: query += f" WITH ({pm})" self._con.execute(query) @@ -188,7 +190,7 @@ def _load_parquet( self, p: FileParser, columns: Any = None, **kwargs: Any ) -> DuckDataFrame: kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) - params: List[str] = [encode_value_to_expr(p.uri_with_glob)] + params: List[str] = [encode_value_to_expr(p.path)] if isinstance(columns, list): cols = ", ".join(encode_column_names(columns)) else: diff --git a/fugue_ray/_utils/io.py b/fugue_ray/_utils/io.py index eaafd687..5e53e095 100644 --- a/fugue_ray/_utils/io.py +++ b/fugue_ray/_utils/io.py @@ -50,7 +50,7 @@ def load_df( len(fmts) == 1, NotImplementedError("can't support multiple formats") ) fmt = fmts[0] - files = [f.uri for f in fp] + files = [f.path for f in fp] return self._loads[fmt](files, columns, **kwargs) def save_df( @@ -74,7 +74,7 @@ def save_df( if not force_single: df = self._prepartition(df, partition_spec=partition_spec) - self._saves[p.file_format](df=df, uri=p.uri, **kwargs) + self._saves[p.file_format](df=df, uri=p.path, **kwargs) else: ldf = df.as_local() makedirs(os.path.dirname(uri), exist_ok=True) diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index 0e288442..7a18b67d 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -39,7 +39,7 @@ def load_df( len(fmts) == 1, NotImplementedError("can't support multiple formats") ) fmt = fmts[0] - files = [f.uri for f in fp] + files = [f.path for f in fp] return self._loads[fmt](files, columns, **kwargs) def save_df( diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index e5e3ff98..4ba33e9a 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -14,7 +14,7 @@ @pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") def test_file_parser_linux(): f = FileParser("/a/b/c.parquet") - assert "file:///a/b/c.parquet" == f.path + assert "/a/b/c.parquet" == f.path assert not f.has_glob assert ".parquet" == f.suffix assert "parquet" == f.file_format @@ -26,18 +26,18 @@ def test_file_parser_linux(): ) def test_file_parser_win(): f = FileParser("c:\\a\\c.parquet") - assert "file://c:/a/c.parquet" == f.path + assert "c:\\a\\c.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert not f.has_glob assert "file://c:/a" == f.parent f = FileParser("c:\\a\\*.parquet") - assert "file://c:/a" == f.path + assert "c:\\a\\*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert f.has_glob - assert "file://c:" == f.parent + assert "file://c:/a" == f.parent def test_file_parser(): @@ -85,13 +85,13 @@ def test_file_parser(): @pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") def test_file_parser_glob_linux(): f = FileParser("/a/b/*.parquet") - assert "file:///a/b/*.parquet" == f.path + assert "/a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert f.has_glob f = FileParser("/a/b/*123.parquet") - assert "file:///a/b/*123.parquet" == f.path + assert "/a/b/*123.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format assert f.has_glob From 51ac86a36d4cbcef935337a7b16f35ac863e6e64 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 2 Nov 2023 07:39:19 +0000 Subject: [PATCH 08/11] update --- fugue_dask/_io.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index 7198e87b..1e320a80 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -5,9 +5,8 @@ from fsspec import AbstractFileSystem from triad.collections.dict import ParamDict from triad.collections.schema import Schema - from triad.utils.assertion import assert_or_throw -from triad.utils.io import join, url_to_fs, makedirs +from triad.utils.io import join, makedirs, url_to_fs from fugue._utils.io import FileParser, _get_single_files from fugue_dask.dataframe import DaskDataFrame From 6573e56e4cdeab075f7bcae8b322eb0b9cf69614 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Fri, 3 Nov 2023 06:15:42 +0000 Subject: [PATCH 09/11] update --- fugue/_utils/io.py | 3 +++ fugue_duckdb/_io.py | 3 +-- tests/fugue/utils/test_io.py | 12 ++++++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index 64d3e8c5..82cf4484 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -77,6 +77,9 @@ def suffix(self) -> str: def file_format(self) -> str: return self._format + def make_parent_dirs(self) -> None: + self._fs.makedirs(self._fs._parent(self._fs_path), exist_ok=True) + def find_all(self) -> Iterable["FileParser"]: if self.has_glob: for x in self._fs.glob(self._fs_path): diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 437f241c..56d21373 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -81,8 +81,7 @@ def save_df( rm(uri, recursive=True) except Exception: # pragma: no cover pass - if not exists(p.parent): - makedirs(p.parent, exist_ok=True) + p.make_parent_dirs() self._format_save[p.file_format](df, p, **kwargs) def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index 4ba33e9a..a99a9aff 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -3,7 +3,7 @@ import pytest from pytest import raises -from triad.utils.io import makedirs, read_text, touch +from triad.utils.io import makedirs, read_text, touch, exists from fugue._utils.io import _FORMAT_MAP, FileParser, load_df, save_df from fugue.dataframe.array_dataframe import ArrayDataFrame @@ -40,13 +40,21 @@ def test_file_parser_win(): assert "file://c:/a" == f.parent -def test_file_parser(): +def test_file_parser(tmpdir): f = FileParser("c.parquet") assert "c.parquet" == f.raw_path assert ".parquet" == f.suffix assert "parquet" == f.file_format # assert "." == f.parent + tp = os.path.join(str(tmpdir), "a", "b") + f = FileParser(os.path.join(tp, "c.parquet")) + assert not exists(tp) + f.make_parent_dirs() + assert exists(tp) + f.make_parent_dirs() + assert exists(tp) + f = FileParser("memory://c.parquet") assert "memory://c.parquet" == f.raw_path assert "memory:///c.parquet" == f.path From d1bed105f503c17818d0b62c7f7e6a1497c6fa95 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Fri, 3 Nov 2023 07:07:52 +0000 Subject: [PATCH 10/11] update docs --- RELEASE.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RELEASE.md b/RELEASE.md index 2bc60c1d..5519aded 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,10 @@ ## 0.8.7 +- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec +- [521](https://github.com/fugue-project/fugue/issues/521) Add `as_dicts` to Fugue API +- [516](https://github.com/fugue-project/fugue/issues/516) Use `_collect_as_arrow` for Spark `as_arrow`` +- [520](https://github.com/fugue-project/fugue/pull/520) Add Python 3.10 to Windows Tests - [506](https://github.com/fugue-project/fugue/issues/506) Adopt pandas `ExtensionDType` - [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures - [503](https://github.com/fugue-project/fugue/issues/503) Deprecate python 3.7 support From 01f0e24a27c0264e15eb96058f7346c073271dbc Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sun, 5 Nov 2023 05:41:54 +0000 Subject: [PATCH 11/11] update --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 09cb7f2e..1e5425df 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def get_version() -> str: keywords="distributed spark dask ray sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad==0.9.2.dev7", + "triad==0.9.2.dev8", "adagio>=0.2.4", # sql dependencies "qpd>=0.4.4",