Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use only loop executor for fsspec source #999

Merged
merged 36 commits into from
Oct 31, 2023
Merged

Conversation

lobis
Copy link
Collaborator

@lobis lobis commented Oct 19, 2023

This PR removes the support for different types of executors for the fsspec source. The use_threads and num_workers options are also dropped.

The loop executor exists only for compatibility with the remaining sources and does not hold any resources. The loop is accessed directly from fsspec.asyn and submit is basically just asyncio.run_coroutine_threadsafe.

With this PR, whenever the fsspec filesystem does not support async calls, the methods are wrapped into a coroutine that runs them in a separate thread so they are not blocking. This may spawn too many short-lived threads but from limited testing I haven't found this to be an issue. The alternative would be to run them as coroutines in the fsspec loop but they wouldn't run concurrently as they are blocking calls. Another alternative would be to use something like https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor to run the coroutines in an executor. Edit: #999 (comment)

This PR took a long time to finish mainly due to an intermittent error in the CI. After much testing I am still not sure what the cause of this is, but I think it has to do with s3fs specifically. I could not reproduce this error outside of pytest running on the uproot repo, I even created another repository to test this but could not reproduce it using the same pytest code and uproot version. Since it's proven so hard to trigger and I don't think this PR causes this error (since it can also be reproduced on the main branch of uproot), I think we can merge this PR and I will try to fix / understand the error in another PR where I enable the s3fs tests.

The code I used to reproduce this:

def test_open_fsspec_s3_issue():
    fs, path = fsspec.core.url_to_fs("s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root")
    # fs, path = fsspec.core.url_to_fs("github://scikit-hep:scikit-hep-testdata@v0.4.33/src/skhep_testdata/data/uproot-issue121.root")
    # fs, path = fsspec.core.url_to_fs("https://github.com/scikit-hep/scikit-hep-testdata/raw/main/src/skhep_testdata/data/uproot-issue121.root")
    data = fs.cat_file(path, start=0, end=100)

    url = "s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst"

    for handler in [
        uproot.source.s3.S3Source,
        uproot.source.s3.S3Source,
        uproot.source.s3.S3Source,
        uproot.source.s3.S3Source,
        uproot.source.s3.S3Source,
        uproot.source.s3.S3Source,
    ]:
        with uproot.open(
                url,
                anon=True,
                handler=handler,
        ) as f:
            data = f["Event/Event.mEventId"].array(library="np")
            assert len(data) == 8004

Notice in the start where I have defined multiple pairs of fs, path then I get some bytes from the file. Only when I run this line with the s3 backend it produces the error. Using a sync-only backend such as github or another async one such as https does not cause the error to trigger, so this is why I think s3fs is the culprit. I disabled the test that used s3fs and the error also disappeared.

@lobis lobis force-pushed the fsspec-optional-backends branch from ab20f43 to e397896 Compare October 19, 2023 04:01
@lobis lobis changed the title test: install optional fsspec backends in the CI for some builds test: install fsspec optional backends such as s3fs Oct 19, 2023
@lobis lobis mentioned this pull request Oct 19, 2023
@lobis lobis changed the title test: install fsspec optional backends such as s3fs feat: use only loop executor for fsspec source Oct 24, 2023
@lobis
Copy link
Collaborator Author

lobis commented Oct 30, 2023

Trace for the error:

Testing started at 13:05 ...
Launching pytest with arguments test_0692_fsspec.py::test_open_fsspec_s3_issue --no-header --no-summary -q in /Users/lobis/git/uproot/tests

============================= test session starts ==============================
collecting ... collected 1 item

test_0692_fsspec.py::test_open_fsspec_s3_issue 

============================== 1 failed in 17.62s ==============================
FAILED                    [100%]
tests/test_0692_fsspec.py:98 (test_open_fsspec_s3_issue)
cls = <class '_pytest.runner.CallInfo'>
func = <function call_runtest_hook.<locals>.<lambda> at 0x10774faf0>
when = 'call'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)

    @classmethod
    def from_call(
        cls,
        func: "Callable[[], TResult]",
        when: "Literal['collect', 'setup', 'call', 'teardown']",
        reraise: Optional[
            Union[Type[BaseException], Tuple[Type[BaseException], ...]]
        ] = None,
    ) -> "CallInfo[TResult]":
        """Call func, wrapping the result in a CallInfo.
    
        :param func:
            The function to call. Called without arguments.
        :param when:
            The phase in which the function is called.
        :param reraise:
            Exception or exceptions that shall propagate if raised by the
            function, instead of being wrapped in the CallInfo.
        """
        excinfo = None
        start = timing.time()
        precise_start = timing.perf_counter()
        try:
>           result: Optional[TResult] = func()

cls        = <class '_pytest.runner.CallInfo'>
duration   = 17.538951792
excinfo    = <ExceptionInfo PytestUnraisableExceptionWarning('Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x1021ab9d0>\n\nTra...g, source=self)\nResourceWarning: unclosed transport <asyncio.sslproto._SSLProtocolTransport object at 0x122382be0>\n') tblen=7>
func       = <function call_runtest_hook.<locals>.<lambda> at 0x10774faf0>
precise_start = 0.873500208
precise_stop = 18.412452
reraise    = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
result     = None
start      = 1698685524.7358022
stop       = 1698685542.2744918
when       = 'call'

../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/runner.py:341: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/runner.py:262: in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
        ihook      = <HookCaller 'pytest_runtest_call'>
        item       = <Function test_open_fsspec_s3_issue>
        kwds       = {}
../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/pluggy/_hooks.py:493: in __call__
    return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
        firstresult = False
        kwargs     = {'item': <Function test_open_fsspec_s3_issue>}
        self       = <HookCaller 'pytest_runtest_call'>
../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/pluggy/_manager.py:115: in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
        firstresult = False
        hook_name  = 'pytest_runtest_call'
        kwargs     = {'item': <Function test_open_fsspec_s3_issue>}
        methods    = [<HookImpl plugin_name='runner', plugin=<module '_pytest.runner' from '/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/runner.py'>>,
 <HookImpl plugin_name='skipping', plugin=<module '_pytest.skipping' from '/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/skipping.py'>>,
 <HookImpl plugin_name='timeout', plugin=<module 'pytest_timeout' from '/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/site-packages/pytest_timeout.py'>>,
 <HookImpl plugin_name='capturemanager', plugin=<CaptureManager _method='fd' _global_capturing=<MultiCapture out=<FDCapture 1 oldfd=5 _state='suspended' tmpfile=<_io.TextIOWrapper name="<_io.FileIO name=6 mode='rb+' closefd=True>" mode='r+' encoding='utf-8'>> err=<FDCapture 2 oldfd=7 _state='suspended' tmpfile=<_io.TextIOWrapper name="<_io.FileIO name=8 mode='rb+' closefd=True>" mode='r+' encoding='utf-8'>> in_=<FDCapture 0 oldfd=3 _state='started' tmpfile=<_io.TextIOWrapper name='/dev/null' mode='r' encoding='utf-8'>> _state='suspended' _in_suspended=False> _capture_fixture=None>>,
 <HookImpl plugin_name='logging-plugin', plugin=<_pytest.logging.LoggingPlugin object at 0x1076b9eb0>>,
 <HookImpl plugin_name='unraisableexception', plugin=<module '_pytest.unraisableexception' from '/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/unraisableexception.py'>>,
 <HookImpl plugin_name='threadexception', plugin=<module '_pytest.threadexception' from '/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/threadexception.py'>>]
        self       = <_pytest.config.PytestPluginManager object at 0x10350bd90>
../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/unraisableexception.py:88: in pytest_runtest_call
    yield from unraisable_exception_runtest_hook()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
        with catch_unraisable_exception() as cm:
            yield
            if cm.unraisable:
                if cm.unraisable.err_msg is not None:
                    err_msg = cm.unraisable.err_msg
                else:
                    err_msg = "Exception ignored in"
                msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
                msg += "".join(
                    traceback.format_exception(
                        cm.unraisable.exc_type,
                        cm.unraisable.exc_value,
                        cm.unraisable.exc_traceback,
                    )
                )
>               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <function _SSLProtocolTransport.__del__ at 0x1021ab9d0>
E               
E               Traceback (most recent call last):
E                 File "/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/asyncio/sslproto.py", line 321, in __del__
E                   _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
E               ResourceWarning: unclosed transport <asyncio.sslproto._SSLProtocolTransport object at 0x122382be0>

cm         = <_pytest.unraisableexception.catch_unraisable_exception object at 0x107742b20>
err_msg    = 'Exception ignored in'
msg        = ('Exception ignored in: <function _SSLProtocolTransport.__del__ at '
 '0x1021ab9d0>\n'
 '\n'
 'Traceback (most recent call last):\n'
 '  File '
 '"/Users/lobis/miniconda3/envs/uproot-38/lib/python3.8/asyncio/sslproto.py", '
 'line 321, in __del__\n'
 '    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)\n'
 'ResourceWarning: unclosed transport <asyncio.sslproto._SSLProtocolTransport '
 'object at 0x122382be0>\n')

../../../miniconda3/envs/uproot-38/lib/python3.8/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning

Process finished with exit code 1```

@lobis lobis marked this pull request as ready for review October 30, 2023 17:13
@lobis lobis requested review from jpivarski and nsmith- October 30, 2023 20:21
Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good simplification! I see that you need work-arounds for fsspec backends that don't have async and Python 3.8, which doesn't have to_thread, and that's okay.

Although sshfs has been added to the test dependencies, I think the only ssh test is disabled. I wonder if running sshd on the test-runner and connecting to

ssh `whoami`@localhost

would be an option? It's not a big deal.

Isolating the glitchiness of the test to S3 is good—we can provide the functionality without testing it because it's one of the things fsspec is supposed to do on its own. (We should only be responsible for using the fsspec API correctly in Uproot—there's a "separation of concerns.") It's too bad that it can't be reproduced outside of Uproot, but I know you put a lot of time into trying to do that.

I think this PR is ready to go as-is! Thanks!

@lobis
Copy link
Collaborator Author

lobis commented Oct 31, 2023

Although sshfs has been added to the test dependencies, I think the only ssh test is disabled. I wonder if running sshd on the test-runner and connecting to

ssh `whoami`@localhost

would be an option? It's not a big deal.

Good idea I can try this in a different PR, I can use the cache directory for skhep_testdata after pulling the file in the same test.

Isolating the glitchiness of the test to S3 is good—we can provide the functionality without testing it because it's one of the things fsspec is supposed to do on its own. (We should only be responsible for using the fsspec API correctly in Uproot—there's a "separation of concerns.") It's too bad that it can't be reproduced outside of Uproot, but I know you put a lot of time into trying to do that.

I'm 90% sure it's s3fs but I cannot say for sure. I created #1012 to continue debugging it. I would say the problem lies here: https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L541-L560, I don't think the socket is properly closed.

@lobis lobis merged commit 0f2c4da into main Oct 31, 2023
21 checks passed
@lobis lobis deleted the fsspec-optional-backends branch October 31, 2023 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants