Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct Way to Setup PyTest Fixture #3540

Closed
seanlaw opened this issue Mar 3, 2020 · 17 comments
Closed

Correct Way to Setup PyTest Fixture #3540

seanlaw opened this issue Mar 3, 2020 · 17 comments

Comments

@seanlaw
Copy link

seanlaw commented Mar 3, 2020

Currently, I have several test files being executed that all require using a dask client that is being setup via a PyTest fixture:

from dask.distributed import Client, LocalCluster
import pytest


@pytest.fixture(scope="module")
def dask_client():
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    client = Client(cluster)
    yield client
    # teardown
    client.close()
    cluster.close()

This exists at the top of each test file and then, the dask_client is accessed with:

def test_one(dask_client):
    ...

def test_two(dask_client):
    ...

def test_three(dask_client):
    ...

Based on my reading of the PyTest documentation, it is my understanding that the dask_client is created once at the start of the execution of the test file (with scope="module"), each test within the test file is executed, and then the dask_client is torn down before the next test file (that also requires a dask_client) does the same thing.

Since the LocalCluster is initially setup with n_workers=2, threads_per_worker=2, I naively expected the maximum number of cores to be 2 and the number of threads per core to also be 2. However, according to the Activity Monitor on my 13" Macbook Pro, I see the number of threads climb to 16 for one process:

threads

Note that I don't have any other Python processes running. All of the Python processes shown in the image appear to be the result of tests starting/stopping and the dask_client tear down is catching up. However, occasionally, by simply re-running the exact same test suite multiple times, we'll encounter a CancelledError:

../../miniconda3/lib/python3.7/site-packages/distributed/client.py:1885: in gather
    asynchronous=asynchronous,
../../miniconda3/lib/python3.7/site-packages/distributed/client.py:767: in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
../../miniconda3/lib/python3.7/site-packages/distributed/utils.py:345: in sync
    raise exc.with_traceback(tb)
../../miniconda3/lib/python3.7/site-packages/distributed/utils.py:329: in f
    result[0] = yield future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tornado.gen.Runner object at 0x1c27e07860>

    def run(self) -> None:
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if future is None:
                    raise Exception("No pending future")
                if not future.done():
                    return
                self.future = None
                try:
                    exc_info = None

                    try:
>                       value = future.result()
E                       concurrent.futures._base.CancelledError

../../miniconda3/lib/python3.7/site-packages/tornado/gen.py:735: CancelledError
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
============================================================ 1 failed, 175 passed in 52.44s ============================================================
Error: pytest encountered exit code 1

Based on my past experience, a CancelledError is common when running a distributed cluster and when there are differences in Python packages installed. However, in this case, we are running a LocalCluster and it appears that all of the resources are being used up and tornado is hanging. Again, the CancelledError happens sporadically when I re-run the exact same test suite multiple times.

I'm guessing that I'm doing things incorrectly or my assumptions are incorrect. Is there a correct/proper way to use Dask LocalCluster with PyTest so that all tests are limited to only 2 cores and 2 threads per core (instead of getting up to 16 threads)?

Initially, a hacky way around this was to limit the total number of tests within each test file which resulted in a test suite with many separate test files (that would each setup/tear down its own dask_client) but with only a handful of tests in each test file. This seemed to help ensure that the number of threads being used wouldn't keep climbing. However, this solution is no longer sufficient and I'm still seeing the same CancelledError as my test suite grows. I've also tried adding cluster restarts inbetween tests, adding a few seconds of sleep time after tear down, or setting up/tearing down dask_client at the test level but this significantly slows down the execution of the test suite.

The test suite can be found here

@TomAugspurger
Copy link
Member

Sorry, I haven't looked too closely at the tests in your repo, but is restructuring the tests to be more similar to distributed's an option? Specifically, using the gen_cluster decorator, as described in https://distributed.dask.org/en/latest/develop.html#writing-tests (that's a bit out of date, going to update it now)?

@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2020 via email

@seanlaw
Copy link
Author

seanlaw commented Mar 3, 2020

I have a few questions:

  1. Should it matter if I was using LocalCluster for my testing previously vs using @gen_cluster? I'm using dask.distributed to perform an embarrassingly parallel/chunked computation so I want to test and make sure that the single threaded computation will yield the same result as a manually chunked/embarrassingly parallel computation. Essentially, the function(s) that I need to test takes in a dask client, performs some chunked computation, and returns the results. So, it will look something like this:
@numba.njit(parallel=True)
def _some_func(chunk):
    # Do some computation on the chunk of data
    return result


def some_func(dask_client, x):
    ...
    chunks = split_array_into_chunks(x)
    futures = []
    for chunk in chunks:
        futures.append(
            dask_client.submit(_some_func, chunk)
        )

    results = dask_client.gather(futures)

    return results
  1. Maybe I'm overlooking something but I can't seem to find where this is covered in the dask.distributed docs. What is the proper way to use pytest.mark attributes along with @gen_cluster? I can usually pass in data and filter warnings in PyTest with:
test_data = [np.random.randint(1000)]

@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x):
    y = some_func(x)
    ...
    npt.assert_almost_equal(the_correct_answer, y)

I can get the @gen_cluster example in the dask.distributed docs to work but I can't seem to figure out how to combine both pytest.mark and @gen_cluster together in one test.

@TomAugspurger
Copy link
Member

Some marks will work (e.g. filterwarnings). I think parametrize causes issues, so the actual test is written in a closure:

@pytest.mark.parametrize("direct_to_workers", [True, False])
def test_client_actions(direct_to_workers):
@gen_cluster(client=True)
def test(c, s, a, b):
c = yield Client(
s.address, asynchronous=True, direct_to_workers=direct_to_workers
)
counter = c.submit(Counter, workers=[a.address], actor=True)
assert isinstance(counter, Future)
counter = yield counter
assert counter._address
assert hasattr(counter, "increment")
assert hasattr(counter, "add")
assert hasattr(counter, "n")
n = yield counter.n
assert n == 0
assert counter._address == a.address
assert isinstance(a.actors[counter.key], Counter)
assert s.tasks[counter.key].actor
yield [counter.increment(), counter.increment()]
n = yield counter.n
assert n == 2
counter.add(10)
while (yield counter.n) != 10 + 2:
n = yield counter.n
yield gen.sleep(0.01)
yield c.close()
test()
.

@seanlaw
Copy link
Author

seanlaw commented Mar 3, 2020

@TomAugspurger that was the missing link! I think I'm now able to transition everything over to using @gen_cluster. Thank you for your help!

@seanlaw
Copy link
Author

seanlaw commented Mar 3, 2020

So, I was able to port all of my tests over to using @gen_cluster and all of the tests pass. In my testing, I separate it into two parts:

  1. I just run all of the tests (with Numba JIT and CUDA compilation enabled) and look for test failures
  2. I follow this up by disabling Numba JIT and CUDA Compiled Functions (so they are all Python only tests), and then re-run all of the tests, and assess code coverage

For the first part, all of the tests pass. However, for the second part (coverage testing), all of the tests pass as well but the tests that use @gen_cluster are unable to verify that the Python functions that need to be tested are actually called. I'm guessing that it's because those Python functions are being run on a different forked process. Is there a way to limit this to only 1 process and 2 threads per process? This would have to be controlled either at the command line (when calling py.test) handled via some environment variable and not controlled at the code level.

@seanlaw seanlaw reopened this Mar 3, 2020
@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2020 via email

@seanlaw
Copy link
Author

seanlaw commented Mar 3, 2020

Hmmm, according to the docs for coverage.py

Coverage.py can measure multi-threaded programs by default. If you are using more exotic concurrency, with the multiprocessing, greenlet, eventlet, or gevent libraries, then coverage.py will get very confused. Use the --concurrency switch to properly measure programs using these libraries. Give it a value of multiprocessing, thread, greenlet, eventlet, or gevent. Values other than thread require the C extension.

So, in theory this should work out of the box with @gen_cluster if dask.distributed is running in threads.

@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2020 via email

@seanlaw
Copy link
Author

seanlaw commented Mar 3, 2020

Okay, I ran a couple of more tests and it looks like the problem is with the closure (or nested function). For some reason, coverage.py isn't able to pick up anything inside of the inner function test():

test_data = [np.random.randint(1000)]

@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x):
    @gen_cluster(client=True)
    def test(c, s, a, b):
        y = some_func(c, x)  # Coverage can't seem to see this function call
        ...
        npt.assert_almost_equal(the_correct_answer, y)

This works fine (i.e., coverage can easily detect the some_func function call) but this is what was causing the CancelledError in the first place:

@pytest.fixture(scope="module")
def dask_client():
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    client = Client(cluster)
    yield client
    # teardown
    client.close()
    cluster.close()

test_data = [np.random.randint(1000)]

@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x, dask_client):
    @gen_cluster(client=True)
    def test(c, s, a, b):
        # Do nothing in the closure
        return

    y = some_func(dask_client, x)
    ...
    npt.assert_almost_equal(the_correct_answer, y)

But in this case, when we move the some_func function call inside of the closure (but still using the client that is generated from the pytest fixture), coverage cannot detect the some_func function call:

@pytest.fixture(scope="module")
def dask_client():
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    client = Client(cluster)
    yield client
    # teardown
    client.close()
    cluster.close()

test_data = [np.random.randint(1000)]

@pytest.mark.filterwarnings("ignore:\\s+Port 8787 is already in use:UserWarning")
@pytest.mark.parametrize("x", test_data)
def test_some_func(x, dask_client):
    @gen_cluster(client=True)
    def test(c, s, a, b):
        # Note that we use a dask_client from the pytest fixture above
        y = some_func(dask_client, x)  # Coverage can't seem to see this function call
        ...
        npt.assert_almost_equal(the_correct_answer, y)

Is there a more manual way that I could perform the same setup/teardown that would avoid the closure? So, it would look something like:

def test_some_func(x):
    # Add dask client setup code here
    y = some_func(c, x)  # Coverage can't seem to see this function call
    ...
    npt.assert_almost_equal(the_correct_answer, y)
    # Add dask client tear down code

@TomAugspurger
Copy link
Member

It may be worth making gen_cluster a proper pytest fixture or mark so that we play more nicely with parametrize and others. I'm not really familiar with how that's done though.

@mrocklin
Copy link
Member

mrocklin commented Mar 4, 2020 via email

@seanlaw
Copy link
Author

seanlaw commented Mar 4, 2020

@mrocklin Can you point me to any good examples where you used async with context manager with the cleanup context manager when interacting with parameterize? I could try emulating it in my code

@mrocklin
Copy link
Member

mrocklin commented Mar 5, 2020

If you grep for async with Scheduler you'll find some. Here is an example

@pytest.mark.asyncio
@pytest.mark.parametrize("Worker", [Worker, Nanny])
async def test_protocol_from_scheduler_address(Worker):
    ucp = pytest.importorskip("ucp")

    async with Scheduler(protocol="ucx") as s:
        assert s.address.startswith("ucx://")
        async with Worker(s.address) as w:
            assert w.address.startswith("ucx://")
            async with Client(s.address, asynchronous=True) as c:
                info = c.scheduler_info()
                assert info["address"].startswith("ucx://")

@seanlaw
Copy link
Author

seanlaw commented Mar 13, 2020

One thing that is still perplexing from my original code is that when I did:

@pytest.fixture(scope="module")
def dask_client():
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    client = Client(cluster)
    yield client
    # teardown
    client.close()
    cluster.close()

def test_some_func_1():
    ...

def test_some_func_2():
    ...

This successfully sets up the LocalCluster and tears it down after all of the tests are run. However, the number of threads get up to double digits (when I expected only 2 threads):

threads

Is this normal? Is there something else that I need to do in order to teardown the LocalCluster properly or more elegantly?

@mrocklin
Copy link
Member

mrocklin commented Mar 13, 2020 via email

@jmoralez
Copy link

jmoralez commented Apr 6, 2021

@seanlaw your approach of defining a cluster fixture instead of using the client one is brilliant. I proposed adopting that in microsoft/LightGBM#4159 which was merged today and reduced the CI time from 20 minutes to 3 minutes. The folks at xgboost are looking into adopting it as well (dmlc/xgboost#6816).

I think this approach should be in https://distributed.dask.org/en/latest/develop.html#writing-tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants