Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use only loop executor for fsspec source #999

Merged
merged 36 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e397896
install optional fsspec backends in the CI for some python versions
lobis Oct 19, 2023
a93eb6c
Merge branch 'main' into fsspec-optional-backends
lobis Oct 19, 2023
c674789
update submit interface
lobis Oct 19, 2023
8da9c09
annotation
lobis Oct 19, 2023
3242517
do not use named arguments for the path/url
lobis Oct 19, 2023
59bec72
Future init
lobis Oct 19, 2023
2a97904
add s3fs and sshfs as test dependencies (fsspec beckends)
lobis Oct 19, 2023
181fa74
Merge remote-tracking branch 'origin/source-futures-submit' into fssp…
lobis Oct 19, 2023
e07231f
test fsspec s3 for more combination of parameters
lobis Oct 19, 2023
5fa2dcc
remove pip install from ci
lobis Oct 19, 2023
b3ddcc2
Merge branch 'main' into fsspec-optional-backends
lobis Oct 19, 2023
8a842c9
style: pre-commit fixes
pre-commit-ci[bot] Oct 19, 2023
3fea743
Merge branch 'main' into fsspec-optional-backends
lobis Oct 19, 2023
662b0f9
revert test order
lobis Oct 19, 2023
8cd55dd
remove dependencies as a test
lobis Oct 20, 2023
ae0a96e
add s3fs to test
lobis Oct 20, 2023
ba58b8f
exclude s3fs to python version 3.12
lobis Oct 20, 2023
3b7dfeb
add sshfs test (skipped)
lobis Oct 20, 2023
a60ca28
fix pytest
lobis Oct 20, 2023
cbbd07d
asyncio not available in 3.12
lobis Oct 20, 2023
06bbf92
asyncio not available in 3.12
lobis Oct 20, 2023
e9c71ac
add comment for fsspec threads
lobis Oct 20, 2023
37e5a24
attempt to close resources
lobis Oct 23, 2023
d2fb8a4
handle s3fs case separate for now
lobis Oct 23, 2023
d30e531
attempt to pass tests
lobis Oct 23, 2023
ee5eeab
attempt to pass tests
lobis Oct 23, 2023
993f979
simplified
lobis Oct 23, 2023
26924e2
remove support for use_threads option, run non-async fs in threads us…
lobis Oct 24, 2023
d898dbc
stop the loop on resource shutdown
lobis Oct 24, 2023
58fe0f0
add skip for xrootd due to server issues
lobis Oct 24, 2023
a8550eb
Merge remote-tracking branch 'origin/main' into fsspec-optional-backends
lobis Oct 25, 2023
09aa93a
remove skip for xrootd
lobis Oct 25, 2023
2e200b5
remove shutdown
lobis Oct 25, 2023
45fddff
Merge branch 'main' into fsspec-optional-backends
lobis Oct 25, 2023
2f73c45
merge and fix conflicts
lobis Oct 26, 2023
27ca078
understand ci fail
lobis Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ dev = [
test = [
"lz4",
"minio",
"aiohttp; python_version<\"3.12\"",
"aiohttp; python_version<\"3.12\"", # asyncio not available
"fsspec",
"fsspec-xrootd",
"s3fs; python_version<\"3.12\"", # asyncio not available
"sshfs; python_version<\"3.12\"", # asyncio not available
"pytest>=6",
"pytest-timeout",
"pytest-rerunfailures",
Expand Down
84 changes: 40 additions & 44 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,19 @@ class FSSpecSource(uproot.source.chunk.Source):
"""

def __init__(self, file_path: str, **options):
import fsspec.asyn
import fsspec.core

default_options = uproot.reading.open.defaults
self._use_threads = options.get("use_threads", default_options["use_threads"])
self._num_workers = options.get("num_workers", default_options["num_workers"])
# Add the possibility to set use_async directly as a hidden option.
# It is not encouraged to do so but may be useful for testing purposes.
self._use_async = options.get("use_async", None) if self._use_threads else False

# TODO: is timeout always valid?

# Remove uproot-specific options (should be done earlier)
exclude_keys = set(default_options.keys())
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}

protocol = fsspec.core.split_protocol(file_path)[0]
fs_has_async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
# If not explicitly set (default), use async if possible
self._use_async = (
fs_has_async_impl if self._use_async is None else self._use_async
)
if self._use_async and not fs_has_async_impl:
# This should never be triggered unless the user explicitly set the `use_async` flag for a non-async backend
raise ValueError(f"Filesystem {protocol} does not support async")

if not self._use_threads:
self._executor = uproot.source.futures.TrivialExecutor()
elif self._use_async:
self._executor = FSSpecLoopExecutor(fsspec.asyn.get_loop())
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
self._async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
self._executor = FSSpecLoopExecutor()

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)

# TODO: set mode to "read-only" in a way that works for all filesystems
self._file = self._fs.open(self._file_path)
self._fh = None
self._num_requests = 0
Expand Down Expand Up @@ -148,28 +123,49 @@ def chunks(
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

try:
# not available in python 3.8
to_thread = asyncio.to_thread
except AttributeError:
import contextvars
import functools

async def to_thread(func, /, *args, **kwargs):
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
lobis marked this conversation as resolved.
Show resolved Hide resolved

async def async_wrapper_thread(blocking_func, *args, **kwargs):
if not callable(blocking_func):
raise TypeError("blocking_func must be callable")
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
return await to_thread(blocking_func, *args, **kwargs)

chunks = []
for start, stop in ranges:
# _cat_file is async while cat_file is not.
# Loop executor takes a coroutine while ThreadPoolExecutor takes a function.
future = self._executor.submit(
self._fs._cat_file if self._use_async else self._fs.cat_file,
# it is assumed that the first argument is the file path / url (can have different names: 'url', 'path')
self._file_path,
start=start,
end=stop,
coroutine = (
self._fs._cat_file(self._file_path, start=start, end=stop)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_file, self._file_path, start=start, end=stop
)
)

future = self._executor.submit(coroutine)

chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
return chunks

@property
def use_async(self) -> bool:
def async_impl(self) -> bool:
"""
True if using an async loop executor; False otherwise.
"""
return self._use_async
return self._async_impl

@property
def num_bytes(self) -> int:
Expand All @@ -190,13 +186,13 @@ def closed(self) -> bool:


class FSSpecLoopExecutor(uproot.source.futures.Executor):
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
@property
def loop(self) -> asyncio.AbstractEventLoop:
import fsspec.asyn

return fsspec.asyn.get_loop()

def submit(self, coroutine, /, *args, **kwargs) -> concurrent.futures.Future:
if not asyncio.iscoroutinefunction(coroutine):
def submit(self, coroutine) -> concurrent.futures.Future:
if not asyncio.iscoroutine(coroutine):
raise TypeError("loop executor can only submit coroutines")
if not self.loop.is_running():
raise RuntimeError("cannot submit coroutine while loop is not running")
coroutine_object = coroutine(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coroutine_object, self.loop)
return asyncio.run_coroutine_threadsafe(coroutine, self.loop)
47 changes: 29 additions & 18 deletions tests/test_0692_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
import queue


@pytest.mark.parametrize("use_threads", [True, False])
def test_open_fsspec_http(server, use_threads):
def test_open_fsspec_http(server):
pytest.importorskip("aiohttp")

url = f"{server}/uproot-issue121.root"

with uproot.open(
url,
handler=uproot.source.fsspec.FSSpecSource,
use_threads=use_threads,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40
Expand All @@ -36,51 +33,65 @@ def test_open_fsspec_github():
assert len(data) == 40


@pytest.mark.parametrize("use_threads", [True, False])
def test_open_fsspec_local(use_threads):
def test_open_fsspec_local():
local_path = skhep_testdata.data_path("uproot-issue121.root")

with uproot.open(
local_path,
handler=uproot.source.fsspec.FSSpecSource,
use_threads=use_threads,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
def test_open_fsspec_s3():
@pytest.mark.parametrize(
"handler",
[
# uproot.source.fsspec.FSSpecSource,
uproot.source.s3.S3Source,
None,
],
)
def test_open_fsspec_s3(handler):
pytest.importorskip("s3fs")

with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
anon=True,
handler=uproot.source.fsspec.FSSpecSource,
handler=handler,
) as f:
data = f["Event/Event.mEventId"].array(library="np")
assert len(data) == 8004


@pytest.mark.parametrize("handler", [uproot.source.fsspec.FSSpecSource, None])
@pytest.mark.skip("you must provide an ssh server to test this")
def test_open_fsspec_ssh(handler):
pytest.importorskip("sshfs")

# change this to a server you have access to
uri = "ssh://user@host:22/tmp/file.root"
with uproot.open(uri, handler=handler) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize(
"handler, use_threads",
"handler",
[
(uproot.source.fsspec.FSSpecSource, True),
(uproot.source.fsspec.FSSpecSource, False),
(uproot.source.xrootd.XRootDSource, True),
(uproot.source.xrootd.XRootDSource, False),
(None, True),
(None, False),
uproot.source.fsspec.FSSpecSource,
uproot.source.xrootd.XRootDSource,
None,
],
)
def test_open_fsspec_xrootd(handler, use_threads):
def test_open_fsspec_xrootd(handler):
pytest.importorskip("XRootD")
with uproot.open(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
handler=handler,
use_threads=use_threads,
) as f:
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
Expand Down
Loading