Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: local http server for tests #1010

Merged
merged 25 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ test = [
"pytest-rerunfailures",
"requests",
"scikit-hep-testdata",
"xxhash"
"xxhash",
"rangehttpserver"
]

[project.urls]
Expand Down
60 changes: 60 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

import pytest
import threading
import contextlib
import skhep_testdata
from functools import partial

# The base http server does not support range requests. Watch https://github.com/python/cpython/issues/86809 for updates
from http.server import HTTPServer
from RangeHTTPServer import RangeRequestHandler

import uproot

Expand All @@ -9,3 +17,55 @@
def reset_classes():
uproot.model.reset_classes()
return


@contextlib.contextmanager
def serve():
# serve files from the skhep_testdata cache directory.
# This directory is initially empty and files are downloaded on demand
class Handler(RangeRequestHandler):
def _cache_file(self, path: str):
path = path.lstrip("/")
if path in skhep_testdata.known_files:
return skhep_testdata.data_path(path)
else:
raise FileNotFoundError(
f"File '{path}' not available in skhep_testdata"
)

def do_HEAD(self):
self._cache_file(self.path)
return super().do_HEAD()

def do_GET(self):
self._cache_file(self.path)
return super().do_GET()

server = HTTPServer(
server_address=("localhost", 0),
RequestHandlerClass=partial(
Handler, directory=skhep_testdata.local_files._cache_path()
),
)
server.server_activate()

def serve_forever(httpd=server):
with httpd:
httpd.serve_forever()

thread = threading.Thread(target=serve_forever, daemon=True)

try:
thread.start()
address, port = server.server_address
yield f"http://{address}:{port}"
finally:
# stop the server
server.shutdown()
thread.join()


@pytest.fixture(scope="module")
def server():
with serve() as server_url:
yield server_url
71 changes: 29 additions & 42 deletions tests/test_0001-source-class.py → tests/test_0001_source_class.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

import os
import platform
import queue
import sys
from io import StringIO
import contextlib

import numpy
import pytest
Expand Down Expand Up @@ -33,9 +29,8 @@ def use_threads(request):


@pytest.mark.parametrize(
"use_threads,num_workers",
"use_threads, num_workers",
[(True, 1), (True, 2), (False, 0)],
indirect=["use_threads"],
)
def test_file(use_threads, num_workers, tmp_path):
filename = tmp_path / "tmp.raw"
Expand Down Expand Up @@ -63,9 +58,8 @@ def test_file(use_threads, num_workers, tmp_path):


@pytest.mark.parametrize(
"use_threads,num_workers",
"use_threads, num_workers",
[(True, 1), (True, 2), (False, 0)],
indirect=["use_threads"],
)
def test_file_fail(use_threads, num_workers, tmp_path):
filename = tmp_path / "tmp.raw"
Expand Down Expand Up @@ -123,30 +117,30 @@ def test_memmap_fail(use_threads, tmp_path):
...


@pytest.mark.skip(reason="RECHECK: example.com is flaky, too")
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.network
def test_http(use_threads):
def test_http(server, use_threads):
url = "https://example.com"
with uproot.source.http.HTTPSource(
"https://example.com",
url,
timeout=10,
num_fallback_workers=1,
use_threads=use_threads,
) as tmp:
) as source:
notifications = queue.Queue()
chunks = tmp.chunks([(0, 100), (50, 55), (200, 400)], notifications)
chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications)
one, two, three = (tobytes(chunk.raw_data) for chunk in chunks)
assert len(one) == 100
assert len(two) == 5
assert len(three) == 200
assert tmp.fallback is None
assert source.fallback is None

with uproot.source.http.MultithreadedHTTPSource(
"https://example.com", num_workers=1, timeout=10, use_threads=use_threads
) as tmp:
url, num_workers=1, timeout=10, use_threads=use_threads
) as source:
notifications = queue.Queue()
chunks = tmp.chunks([(0, 100), (50, 55), (200, 400)], notifications)
assert [tobytes(x.raw_data) for x in chunks] == [one, two, three]
chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications)
assert [tobytes(chunk.raw_data) for chunk in chunks] == [one, two, three]


def test_colons_and_ports():
Expand All @@ -163,7 +157,6 @@ def test_colons_and_ports():
) == ("https://example.com:443/something", "else")


@pytest.mark.skip(reason="RECHECK: example.com is flaky, too")
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
@pytest.mark.network
def test_http_port(use_threads):
Expand All @@ -190,19 +183,19 @@ def test_http_port(use_threads):
assert [tobytes(x.raw_data) for x in chunks] == [one, two, three]


@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
@pytest.mark.network
def test_http_size(use_threads):
@pytest.mark.parametrize("use_threads", [True, False])
def test_http_size(server, use_threads):
url = f"{server}/uproot-issue121.root"
with uproot.source.http.HTTPSource(
"https://scikit-hep.org/uproot3/examples/Zmumu.root",
url,
timeout=10,
num_fallback_workers=1,
use_threads=use_threads,
) as source:
size1 = source.num_bytes

with uproot.source.http.MultithreadedHTTPSource(
"https://scikit-hep.org/uproot3/examples/Zmumu.root",
url,
num_workers=1,
timeout=10,
use_threads=use_threads,
Expand Down Expand Up @@ -243,16 +236,15 @@ def test_http_fail(use_threads):
num_fallback_workers=1,
use_threads=use_threads,
)
with pytest.raises(Exception) as err:
with pytest.raises(Exception):
notifications = queue.Queue()
chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications)
chunks[0].raw_data


@pytest.mark.parametrize(
"use_threads,num_workers",
"use_threads, num_workers",
[(True, 1), (True, 2), (False, 0)],
indirect=["use_threads"],
)
@pytest.mark.network
def test_no_multipart(use_threads, num_workers):
Expand All @@ -272,9 +264,8 @@ def test_no_multipart(use_threads, num_workers):


@pytest.mark.parametrize(
"use_threads,num_workers",
"use_threads, num_workers",
[(True, 1), (True, 2), (False, 0)],
indirect=["use_threads"],
)
@pytest.mark.network
def test_no_multipart_fail(use_threads, num_workers):
Expand All @@ -284,21 +275,17 @@ def test_no_multipart_fail(use_threads, num_workers):
timeout=0.1,
use_threads=use_threads,
)
with pytest.raises(Exception) as err:
with pytest.raises(Exception):
notifications = queue.Queue()
chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications)
chunks[0].raw_data


@pytest.mark.parametrize(
"use_threads,num_workers",
[(True, 1), (True, 2), (False, 0)],
indirect=["use_threads"],
)
@pytest.mark.network
def test_fallback(use_threads, num_workers):
@pytest.mark.parametrize("use_threads, num_workers", [(True, 1), (True, 2), (False, 0)])
def test_fallback(server, use_threads, num_workers):
url = f"{server}/uproot-issue121.root"
with uproot.source.http.HTTPSource(
"https://scikit-hep.org/uproot3/examples/Zmumu.root",
url,
timeout=10,
num_fallback_workers=num_workers,
use_threads=use_threads,
Expand Down Expand Up @@ -343,7 +330,7 @@ def test_xrootd(use_threads):
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_deadlock(use_threads):
pytest.importorskip("XRootD")
# Attach this file to the "test_xrootd_deadlock" function so it leaks
# Attach this file to the "test_xrootd_deadlock" function, so it leaks
pytest.uproot_test_xrootd_deadlock_f = uproot.source.xrootd.XRootDResource(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
timeout=20,
Expand All @@ -356,7 +343,7 @@ def test_xrootd_deadlock(use_threads):
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_fail(use_threads):
pytest.importorskip("XRootD")
with pytest.raises(Exception) as err:
with pytest.raises(Exception):
uproot.source.xrootd.MultithreadedXRootDSource(
"root://wonky.cern/does-not-exist",
num_workers=1,
Expand Down Expand Up @@ -447,7 +434,7 @@ def get_chunk(Source, **kwargs):
@pytest.mark.parametrize("use_threads", [True, False], indirect=True)
def test_xrootd_vectorread_fail(use_threads):
pytest.importorskip("XRootD")
with pytest.raises(Exception) as err:
with pytest.raises(Exception):
uproot.source.xrootd.XRootDSource(
"root://wonky.cern/does-not-exist",
timeout=1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

import os
import queue
import sys
from io import StringIO

import numpy
import pytest

import uproot
Expand Down Expand Up @@ -68,12 +64,11 @@ def test_memmap(tmpdir):
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(reason="RECHECK: example.com is flaky, too")
@pytest.mark.network
def test_http_multipart():
def test_http_multipart(server):
url = f"{server}/uproot-issue121.root"
notifications = queue.Queue()
with uproot.source.http.HTTPSource(
"https://example.com", timeout=10, num_fallback_workers=1, use_threads=True
url, timeout=10, num_fallback_workers=1, use_threads=True
) as source:
chunks = source.chunks(
[(0, 100), (50, 55), (200, 400)], notifications=notifications
Expand All @@ -84,12 +79,11 @@ def test_http_multipart():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(reason="RECHECK: example.com is flaky, too")
@pytest.mark.network
def test_http():
def test_http(server):
url = f"{server}/uproot-issue121.root"
notifications = queue.Queue()
with uproot.source.http.MultithreadedHTTPSource(
"https://example.com", timeout=10, num_workers=1, use_threads=True
url, timeout=10, num_workers=1, use_threads=True
) as source:
chunks = source.chunks(
[(0, 100), (50, 55), (200, 400)], notifications=notifications
Expand All @@ -100,12 +94,11 @@ def test_http():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.skip(reason="RECHECK: example.com is flaky, too")
@pytest.mark.network
def test_http_workers():
def test_http_workers(server):
url = f"{server}/uproot-issue121.root"
notifications = queue.Queue()
with uproot.source.http.MultithreadedHTTPSource(
"https://example.com", timeout=10, num_workers=2, use_threads=True
url, timeout=10, num_workers=2, use_threads=True
) as source:
chunks = source.chunks(
[(0, 100), (50, 55), (200, 400)], notifications=notifications
Expand All @@ -116,11 +109,11 @@ def test_http_workers():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.network
def test_http_fallback():
def test_http_fallback(server):
url = f"{server}/uproot-issue121.root"
notifications = queue.Queue()
with uproot.source.http.HTTPSource(
"https://scikit-hep.org/uproot3/examples/Zmumu.root",
url,
timeout=10,
num_fallback_workers=1,
use_threads=True,
Expand All @@ -134,11 +127,11 @@ def test_http_fallback():
expected.pop((chunk.start, chunk.stop))


@pytest.mark.network
def test_http_fallback_workers():
def test_http_fallback_workers(server):
url = f"{server}/uproot-issue121.root"
notifications = queue.Queue()
with uproot.source.http.HTTPSource(
"https://scikit-hep.org/uproot3/examples/Zmumu.root",
url,
timeout=10,
num_fallback_workers=5,
use_threads=True,
Expand Down
Loading