Skip to content

Commit

Permalink
feat: use only loop executor for fsspec source (#999)
Browse files Browse the repository at this point in the history
* install optional fsspec backends in the CI for some python versions

* update submit interface

* annotation

* do not use named arguments for the path/url

* Future init

* add s3fs and sshfs as test dependencies (fsspec beckends)

* test fsspec s3 for more combination of parameters

* remove pip install from ci

* style: pre-commit fixes

* revert test order

* remove dependencies as a test

* add s3fs to test

* exclude s3fs to python version 3.12

* add sshfs test (skipped)

* fix pytest

* asyncio not available in 3.12

* asyncio not available in 3.12

* add comment for fsspec threads

* attempt to close resources

* handle s3fs case separate for now

* attempt to pass tests

* attempt to pass tests

* simplified

* remove support for use_threads option, run non-async fs in threads using asyncio

* stop the loop on resource shutdown

* add skip for xrootd due to server issues

* remove skip for xrootd

* remove shutdown

* understand ci fail

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
lobis and pre-commit-ci[bot] authored Oct 31, 2023
1 parent 3341ce0 commit 0f2c4da
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 63 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ dev = [
test = [
"lz4",
"minio",
"aiohttp; python_version<\"3.12\"",
"aiohttp; python_version<\"3.12\"", # asyncio not available
"fsspec",
"fsspec-xrootd",
"s3fs; python_version<\"3.12\"", # asyncio not available
"sshfs; python_version<\"3.12\"", # asyncio not available
"pytest>=6",
"pytest-timeout",
"pytest-rerunfailures",
Expand Down
84 changes: 40 additions & 44 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,19 @@ class FSSpecSource(uproot.source.chunk.Source):
"""

def __init__(self, file_path: str, **options):
import fsspec.asyn
import fsspec.core

default_options = uproot.reading.open.defaults
self._use_threads = options.get("use_threads", default_options["use_threads"])
self._num_workers = options.get("num_workers", default_options["num_workers"])
# Add the possibility to set use_async directly as a hidden option.
# It is not encouraged to do so but may be useful for testing purposes.
self._use_async = options.get("use_async", None) if self._use_threads else False

# TODO: is timeout always valid?

# Remove uproot-specific options (should be done earlier)
exclude_keys = set(default_options.keys())
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}

protocol = fsspec.core.split_protocol(file_path)[0]
fs_has_async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
# If not explicitly set (default), use async if possible
self._use_async = (
fs_has_async_impl if self._use_async is None else self._use_async
)
if self._use_async and not fs_has_async_impl:
# This should never be triggered unless the user explicitly set the `use_async` flag for a non-async backend
raise ValueError(f"Filesystem {protocol} does not support async")

if not self._use_threads:
self._executor = uproot.source.futures.TrivialExecutor()
elif self._use_async:
self._executor = FSSpecLoopExecutor(fsspec.asyn.get_loop())
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
self._async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
self._executor = FSSpecLoopExecutor()

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)

# TODO: set mode to "read-only" in a way that works for all filesystems
self._file = self._fs.open(self._file_path)
self._fh = None
self._num_requests = 0
Expand Down Expand Up @@ -148,28 +123,49 @@ def chunks(
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

try:
# not available in python 3.8
to_thread = asyncio.to_thread
except AttributeError:
import contextvars
import functools

async def to_thread(func, /, *args, **kwargs):
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)

async def async_wrapper_thread(blocking_func, *args, **kwargs):
if not callable(blocking_func):
raise TypeError("blocking_func must be callable")
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
return await to_thread(blocking_func, *args, **kwargs)

chunks = []
for start, stop in ranges:
# _cat_file is async while cat_file is not.
# Loop executor takes a coroutine while ThreadPoolExecutor takes a function.
future = self._executor.submit(
self._fs._cat_file if self._use_async else self._fs.cat_file,
# it is assumed that the first argument is the file path / url (can have different names: 'url', 'path')
self._file_path,
start=start,
end=stop,
coroutine = (
self._fs._cat_file(self._file_path, start=start, end=stop)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_file, self._file_path, start=start, end=stop
)
)

future = self._executor.submit(coroutine)

chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
return chunks

@property
def use_async(self) -> bool:
def async_impl(self) -> bool:
"""
True if using an async loop executor; False otherwise.
"""
return self._use_async
return self._async_impl

@property
def num_bytes(self) -> int:
Expand All @@ -190,13 +186,13 @@ def closed(self) -> bool:


class FSSpecLoopExecutor(uproot.source.futures.Executor):
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
@property
def loop(self) -> asyncio.AbstractEventLoop:
import fsspec.asyn

return fsspec.asyn.get_loop()

def submit(self, coroutine, /, *args, **kwargs) -> concurrent.futures.Future:
if not asyncio.iscoroutinefunction(coroutine):
def submit(self, coroutine) -> concurrent.futures.Future:
if not asyncio.iscoroutine(coroutine):
raise TypeError("loop executor can only submit coroutines")
if not self.loop.is_running():
raise RuntimeError("cannot submit coroutine while loop is not running")
coroutine_object = coroutine(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coroutine_object, self.loop)
return asyncio.run_coroutine_threadsafe(coroutine, self.loop)
47 changes: 29 additions & 18 deletions tests/test_0692_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
import queue


@pytest.mark.parametrize("use_threads", [True, False])
def test_open_fsspec_http(server, use_threads):
def test_open_fsspec_http(server):
pytest.importorskip("aiohttp")

url = f"{server}/uproot-issue121.root"

with uproot.open(
url,
handler=uproot.source.fsspec.FSSpecSource,
use_threads=use_threads,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40
Expand All @@ -36,51 +33,65 @@ def test_open_fsspec_github():
assert len(data) == 40


@pytest.mark.parametrize("use_threads", [True, False])
def test_open_fsspec_local(use_threads):
def test_open_fsspec_local():
local_path = skhep_testdata.data_path("uproot-issue121.root")

with uproot.open(
local_path,
handler=uproot.source.fsspec.FSSpecSource,
use_threads=use_threads,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
def test_open_fsspec_s3():
@pytest.mark.parametrize(
"handler",
[
# uproot.source.fsspec.FSSpecSource,
uproot.source.s3.S3Source,
None,
],
)
def test_open_fsspec_s3(handler):
pytest.importorskip("s3fs")

with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
anon=True,
handler=uproot.source.fsspec.FSSpecSource,
handler=handler,
) as f:
data = f["Event/Event.mEventId"].array(library="np")
assert len(data) == 8004


@pytest.mark.parametrize("handler", [uproot.source.fsspec.FSSpecSource, None])
@pytest.mark.skip("you must provide an ssh server to test this")
def test_open_fsspec_ssh(handler):
pytest.importorskip("sshfs")

# change this to a server you have access to
uri = "ssh://user@host:22/tmp/file.root"
with uproot.open(uri, handler=handler) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize(
"handler, use_threads",
"handler",
[
(uproot.source.fsspec.FSSpecSource, True),
(uproot.source.fsspec.FSSpecSource, False),
(uproot.source.xrootd.XRootDSource, True),
(uproot.source.xrootd.XRootDSource, False),
(None, True),
(None, False),
uproot.source.fsspec.FSSpecSource,
uproot.source.xrootd.XRootDSource,
None,
],
)
def test_open_fsspec_xrootd(handler, use_threads):
def test_open_fsspec_xrootd(handler):
pytest.importorskip("XRootD")
with uproot.open(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
handler=handler,
use_threads=use_threads,
) as f:
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
Expand Down

0 comments on commit 0f2c4da

Please sign in to comment.