Skip to content

[WIP] ENH: support reading directory in read_csv #61275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 129 additions & 1 deletion pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import codecs
from collections import defaultdict
from collections.abc import (
Generator,
Hashable,
Iterable,
Mapping,
Sequence,
)
Expand All @@ -26,7 +28,10 @@
)
import mmap
import os
from pathlib import Path
from pathlib import (
Path,
PurePosixPath,
)
import re
import tarfile
from typing import (
Expand Down Expand Up @@ -55,6 +60,7 @@
BaseBuffer,
ReadCsvBuffer,
)
from pandas.compat import is_platform_windows
from pandas.compat._optional import import_optional_dependency
from pandas.util._decorators import doc
from pandas.util._exceptions import find_stack_level
Expand Down Expand Up @@ -1282,3 +1288,125 @@ def dedup_names(
counts[col] = cur_count + 1

return names


def _infer_protocol(path: str) -> str:
# Treat Windows drive letters like C:\ as local file paths
if is_platform_windows() and re.match(r"^[a-zA-Z]:[\\/]", path):
return "file"

if is_fsspec_url(path):
parsed = parse_url(path)
return parsed.scheme
return "file"


def _match_file(
path: Path | PurePosixPath, extensions: set[str] | None, glob: str | None
) -> bool:
"""Check if the file matches the given extensions and glob pattern.
Parameters
----------
path : Path or PurePosixPath
The file path to check.
extensions : set[str]
A set of file extensions to match against.
glob : str
A glob pattern to match against.
Returns
-------
bool
True if the file matches the extensions and glob pattern, False otherwise.
"""
return (extensions is None or path.suffix.lower() in extensions) and (
glob is None or path.match(glob)
)


def iterdir(
path: FilePath,
extensions: str | Iterable[str] | None = None,
glob: str | None = None,
) -> Generator[Path | PurePosixPath]:
"""Yield file paths in a directory (no nesting allowed).

Supports:
- Local paths (str, os.PathLike)
- file:// URLs
- Remote paths (e.g., s3://) via fsspec (if installed)

Parameters
----------
path : FilePath
Path to the directory (local or remote).
extensions : str or list of str, optional
Only yield files with the given extension(s). Case-insensitive.
If None, all files are yielded.
glob : str, optional
Only yield files matching the given glob pattern.
If None, all files are yielded.

Yields
------
pathlib.Path or pathlib.PurePosixPath
File paths within the directory.

Raises
------
NotADirectoryError
If the given path is not a directory.
ImportError
If fsspec is required but not installed.
"""
if extensions is not None:
if isinstance(extensions, str):
extensions = {extensions.lower()}
else:
extensions = {ext.lower() for ext in extensions}

path_str = os.fspath(path)
scheme = _infer_protocol(path_str)

if scheme == "file":
resolved_path = Path(path_str)
if resolved_path.is_file():
if _match_file(
resolved_path,
extensions,
glob,
):
yield resolved_path
return

for entry in resolved_path.iterdir():
if entry.is_file():
if _match_file(
entry,
extensions,
glob,
):
yield entry
return

# Remote paths
fsspec = import_optional_dependency("fsspec", extra=scheme)
fs = fsspec.filesystem(scheme)
path_without_scheme = fsspec.core.strip_protocol(path_str)
if fs.isfile(path_without_scheme):
if _match_file(
path_without_scheme,
extensions,
glob,
):
yield PurePosixPath(path_without_scheme)
return

for file in fs.ls(path_without_scheme, detail=True):
if file["type"] == "file":
path_obj = PurePosixPath(file["name"])
if _match_file(
path_obj,
extensions,
glob,
):
yield path_obj
29 changes: 29 additions & 0 deletions pandas/tests/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,32 @@ def compression_format(request):
@pytest.fixture(params=_compression_formats_params)
def compression_ext(request):
return request.param[0]


@pytest.fixture
def local_csv_directory(tmp_path):
"""
Fixture to create a directory with dummy CSV files for testing.
"""
for i in range(3):
file_path = tmp_path / f"{i}.csv"
file_path.touch()
return tmp_path


@pytest.fixture
def remote_csv_directory(monkeypatch):
_ = pytest.importorskip("fsspec", reason="fsspec is required for remote tests")

from fsspec.implementations.memory import MemoryFileSystem

fs = MemoryFileSystem()
fs.store.clear()

dir_name = "remote-bucket"
fs.pipe(f"{dir_name}/a.csv", b"a,b,c\n1,2,3\n")
fs.pipe(f"{dir_name}/b.csv", b"a,b,c\n4,5,6\n")
fs.pipe(f"{dir_name}/nested/ignored.csv", b"x,y,z\n")

monkeypatch.setattr("fsspec.filesystem", lambda _: fs)
return f"s3://{dir_name}"
37 changes: 37 additions & 0 deletions pandas/tests/io/parser/test_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from csv import (
DictWriter,
reader as csv_reader,
)

import pytest


@pytest.fixture
def directory_data():
return ["a", "b", "c"], [
{"first": {"a": 1, "b": 2, "c": 3}},
{"second": {"a": 4, "b": 5, "c": 6}},
{"third": {"a": 7, "b": 8, "c": 9}},
]


@pytest.fixture
def directory_data_to_file(tmp_path, directory_data):
field_names, data_list = directory_data
for data in data_list:
file_name = next(iter(data.keys()))
path = tmp_path / f"{file_name}.csv"
with path.open("w", newline="", encoding="utf-8") as file:
writer = DictWriter(file, fieldnames=field_names)
writer.writeheader()
writer.writerow(data[file_name])
return tmp_path


def test_directory_data(directory_data_to_file):
assert len(list(directory_data_to_file.iterdir())) == 3
for file in directory_data_to_file.iterdir():
with file.open(encoding="utf-8") as f:
reader = csv_reader(f)
header = next(reader)
assert header == ["a", "b", "c"]
39 changes: 39 additions & 0 deletions pandas/tests/io/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,42 @@ def test_pyarrow_read_csv_datetime_dtype():
expect = pd.DataFrame({"date": expect_data})

tm.assert_frame_equal(expect, result)


def test_iterdir_local(local_csv_directory):
for file in icom.iterdir(local_csv_directory):
assert file.is_file()
assert file.suffix == ".csv"


def test_remote_csv_directory(remote_csv_directory):
import fsspec
from fsspec.implementations.memory import MemoryFileSystem

fs = fsspec.filesystem("s3")
assert isinstance(fs, MemoryFileSystem)

assert fs.exists("remote-bucket")
assert fs.isdir("remote-bucket")

files = fs.ls("remote-bucket", detail=True)

file_names = sorted(f["name"] for f in files if f["type"] == "file")
assert file_names == ["/remote-bucket/a.csv", "/remote-bucket/b.csv"]

dir_names = [f["name"] for f in files if f["type"] == "directory"]
assert "/remote-bucket/nested" in dir_names

nested_files = fs.ls("remote-bucket/nested", detail=True)
assert nested_files[0]["name"] == "/remote-bucket/nested/ignored.csv"


def test_iterdir_remote(remote_csv_directory):
import fsspec

fs = fsspec.filesystem("s3")
for file in icom.iterdir(remote_csv_directory):
# for fsspec<2024.5.0, fs.isfle(PurePosixPath) returns False
assert fs.exists(str(file))
assert file.suffix == ".csv"
assert fs.isfile(str(file))
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading