diff --git a/Lib/unittest/__init__.py b/Lib/unittest/__init__.py index 5ff1bf37b16965..3368b292ac7306 100644 --- a/Lib/unittest/__init__.py +++ b/Lib/unittest/__init__.py @@ -44,7 +44,7 @@ def testMultiply(self): SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. """ -__all__ = ['TestResult', 'TestCase', 'TestSuite', +__all__ = ['TestResult', 'TestCase', 'TestSuite', 'AsyncioTestCase', 'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main', 'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless', 'expectedFailure', 'TextTestResult', 'installHandler', @@ -58,7 +58,7 @@ def testMultiply(self): from .result import TestResult from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip, - skipIf, skipUnless, expectedFailure) + skipIf, skipUnless, expectedFailure, AsyncioTestCase) from .suite import BaseTestSuite, TestSuite from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames, findTestCases) diff --git a/Lib/unittest/case.py b/Lib/unittest/case.py index a157ae8a14bcbe..dc322b127b55d7 100644 --- a/Lib/unittest/case.py +++ b/Lib/unittest/case.py @@ -1,5 +1,6 @@ """Test case implementation""" +import asyncio import sys import functools import difflib @@ -23,25 +24,20 @@ 'Set self.maxDiff to None to see it.') class SkipTest(Exception): - """ - Raise this exception in a test to skip it. + """Raise this exception in a test to skip it. Usually you can use TestCase.skipTest() or one of the skipping decorators instead of raising this directly. """ class _ShouldStop(Exception): - """ - The test should stop. - """ + """The test should stop.""" class _UnexpectedSuccess(Exception): - """ - The test was supposed to fail, but it didn't! - """ + """The test was supposed to fail, but it didn't!""" -class _Outcome(object): +class _Outcome: def __init__(self, result=None): self.expecting_failure = False self.result = result @@ -81,6 +77,16 @@ def testPartExecutor(self, test_case, isTest=False): self.success = self.success and old_success +class _IgnoredOutcome: + def __init__(self, result=None): + self.expecting_failure = None + self.success = True + + @contextlib.contextmanager + def testPartExecutor(self, test_case, isTest=False): + yield + + def _id(obj): return obj @@ -109,9 +115,7 @@ def doModuleCleanups(): def skip(reason): - """ - Unconditionally skip a test. - """ + """Unconditionally skip a test.""" def decorator(test_item): if not isinstance(test_item, type): @functools.wraps(test_item) @@ -125,17 +129,13 @@ def skip_wrapper(*args, **kwargs): return decorator def skipIf(condition, reason): - """ - Skip a test if the condition is true. - """ + """Skip a test if the condition is true.""" if condition: return skip(reason) return _id def skipUnless(condition, reason): - """ - Skip a test unless the condition is true. - """ + """Skip a test unless the condition is true.""" if not condition: return skip(reason) return _id @@ -171,8 +171,7 @@ def __init__(self, expected, test_case, expected_regex=None): self.msg = None def handle(self, name, args, kwargs): - """ - If args is empty, assertRaises/Warns is being used as a + """If args is empty, assertRaises/Warns is being used as a context manager, so check for a 'msg' kwarg and return self. If args is not empty, call a callable passing positional and keyword arguments. @@ -367,7 +366,7 @@ def __iter__(self): yield k -class TestCase(object): +class TestCase: """A class whose instances are single test cases. By default, the test code itself should be placed in a method named @@ -633,17 +632,7 @@ def run(self, result=None): outcome = _Outcome(result) try: self._outcome = outcome - - with outcome.testPartExecutor(self): - self.setUp() - if outcome.success: - outcome.expecting_failure = expecting_failure - with outcome.testPartExecutor(self, isTest=True): - testMethod() - outcome.expecting_failure = False - with outcome.testPartExecutor(self): - self.tearDown() - + self._runTest(testMethod, outcome, expecting_failure) self.doCleanups() for test, reason in outcome.skipped: self._addSkip(result, test, reason) @@ -658,6 +647,7 @@ def run(self, result=None): result.addSuccess(self) return result finally: + self._terminateTest(outcome) result.stopTest(self) if orig_result is None: stopTestRun = getattr(result, 'stopTestRun', None) @@ -701,14 +691,27 @@ def doClassCleanups(cls): def __call__(self, *args, **kwds): return self.run(*args, **kwds) + def _runTest(self, testMethod, outcome, expecting_failure): + """Run the test and collect errors into a TestResult""" + with outcome.testPartExecutor(self): + self.setUp() + if outcome.success: + outcome.expecting_failure = expecting_failure + with outcome.testPartExecutor(self, isTest=True): + testMethod() + outcome.expecting_failure = False + with outcome.testPartExecutor(self): + self.tearDown() + + def _terminateTest(self, outcome): + """Hook that is called after a test run is complete.""" + pass + def debug(self): """Run the test without collecting errors in a TestResult""" - self.setUp() - getattr(self, self._testMethodName)() - self.tearDown() - while self._cleanups: - function, args, kwargs = self._cleanups.pop(-1) - function(*args, **kwargs) + self._runTest(getattr(self, self._testMethodName), + _IgnoredOutcome(), None) + self.doCleanups() def skipTest(self, reason): """Skip this test.""" @@ -1483,3 +1486,149 @@ def shortDescription(self): def __str__(self): return "{} {}".format(self.test_case, self._subDescription()) + + +class AsyncioTestCase(TestCase): + """Extension of `unittest.TestCase` for concurrent test cases. + + This extension of `unittest.TestCase` runs the instance methods + decorated with the **async**/**await** syntax on a event loop. + The event loop is created anew for each test method run and + destroyed when the test method has exited. Both the `setUp` and + `tearDown` method are decorated with ``async``. Sub-classes that + extend either method MUST call the parent method using the ``await`` + keyword. + + If individual test methods are co-routines (as defined by + ``asyncio.iscoroutinefunction``), then they will be run on the + active event loop; otherwise, they will be called as simple methods. + In the latter case, the event loop IS NOT RUNNING; however, you + can use it to run async code if necessary. + + Clean up functions will be run on the event loop if they are + detected as co-routines; otherwise, they are called as standard + functions. After ALL cleanup calls are completed, the event loop + is stopped and closed. + + When subclassing AsyncioTestCase, the event loop is available + via the ``loop`` property. Tests can assume that `self.loop` + refers to the event loop created for the test case. It is also + the active event loop as returned by ``asyncio.get_event_loop()``. + Test cases SHOULD NOT change the active event loop during the test + run. + + The lifecycle of a single test execution is thus:: + + - ``TestCase`` instance is created + - ``setUpClass`` is executed + - for each test method + - new event loop is created + - ``setUp`` is executed on the event loop + - if ``setUp`` succeeds + - the test method is run on the event loop if it is a + co-routine; otherwise it is simply called + - ``tearDown`` is executed on the event loop + - the event loop is stopped if it is somehow running + - ``doCleanups`` is called + - callables registered with ``addCleanup`` are called + in reverse-order executing them on the event loop if + necessary + - the event loop is stopped if necessary + - the event loop is closed and unregistered + + The process is tightly managed by the ``_runTest`` and ``doCleanups`` + methods. Care must be taken if extending this class to ensure that + the event loop is properly managed. + + """ + + def __init__(self, methodName='runTest'): + super().__init__(methodName) + self.__loop = None + + @classmethod + def setUpClass(cls): + """Hook method for one-time initialization. + + Subclasses MUST invoke this method when extending it. + """ + super().setUpClass() + cls.__saved_policy = asyncio.events._event_loop_policy + + @classmethod + def tearDownClass(cls): + """Hook method for one-time cleanup. + + Subclasses MUST invoke this method when extending it. + """ + super().tearDownClass() + asyncio.set_event_loop_policy(cls.__saved_policy) + + async def asyncSetUp(self): + """Hook method for setting up the test fixture before exercising it. + + This method invokes ``setUp`` inline so subclasses MUST invoke this + method using the ``await`` keyword when extending it. + """ + self.setUp() + + async def asyncTearDown(self): + """Hook method for deconstructing the test fixture after testing it. + + This method invokes ``tearDown`` inline so subclasses MUST invoke + this method using the ``await`` keyword when extending it. + """ + self.tearDown() + + @property + def loop(self): + """Active event loop for the test case.""" + if self.__loop is None: + self.__loop = asyncio.new_event_loop() + self.__loop.set_debug(True) + asyncio.set_event_loop(self.__loop) + return self.__loop + + def _runTest(self, testMethod, outcome, expecting_failure): + try: + with outcome.testPartExecutor(self): + self.loop.run_until_complete(self.asyncSetUp()) + if outcome.success: + outcome.expecting_failure = expecting_failure + with outcome.testPartExecutor(self, isTest=True): + if asyncio.iscoroutinefunction(testMethod): + self.loop.run_until_complete(testMethod()) + else: + testMethod() + outcome.expecting_failure = False + with outcome.testPartExecutor(self): + self.loop.run_until_complete(self.asyncTearDown()) + finally: + if self.loop.is_running(): + self.loop.stop() + + def _terminateTest(self, outcome): + super()._terminateTest(outcome) + if self.loop.is_running(): + self.loop.stop() + self.loop.run_until_complete( + self.loop.shutdown_asyncgens()) + asyncio.set_event_loop(None) + if not self.loop.is_closed(): + self.loop.close() + self.__loop = None + + def doCleanups(self): + """Execute all cleanup functions. + + Normally called for you after tearDown. + """ + outcome = self._outcome or _Outcome() + while self._cleanups: + cleanup_func, args, kwargs = self._cleanups.pop() + with outcome.testPartExecutor(self): + if asyncio.iscoroutinefunction(cleanup_func): + self.loop.run_until_complete( + cleanup_func(*args, **kwargs)) + else: + cleanup_func(*args, **kwargs) diff --git a/Lib/unittest/test/test_asyncio_test_case.py b/Lib/unittest/test/test_asyncio_test_case.py new file mode 100644 index 00000000000000..97add22f1a5125 --- /dev/null +++ b/Lib/unittest/test/test_asyncio_test_case.py @@ -0,0 +1,281 @@ +import asyncio +import collections +import unittest + + +class Test: + "Keep these TestCase classes out of the main namespace" + + class SingleTestAsyncTestCase(unittest.AsyncioTestCase): + def test_one(self): + pass + + +def run_test_case(test_class, should_fail=False): + result = unittest.TestResult() + result.failfast = True + suite = unittest.makeSuite(test_class) + suite.run(result) + if result.testsRun == 0: + raise AssertionError(f'test {test_class} not run') + if should_fail and result.wasSuccessful(): + raise AssertionError(f'test {test_class} unexpectedly succeeded') + elif not should_fail and not result.wasSuccessful(): + raise AssertionError(f'test {test_class} unexpectedly failed') + + +class EventLoopTracking: + class Loop(asyncio.SelectorEventLoop): + def __init__(self, tracker, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__tracker = tracker + + def run_forever(self): + super().run_forever() + self.__tracker.register_call('loop.run_forever', self) + + def close(self): + super().close() + self.__tracker.register_call('loop.close', self) + + def stop(self): + super().stop() + self.__tracker.register_call('loop.stop', self) + + class Policy(asyncio.DefaultEventLoopPolicy): + def __init__(self, tracker, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__tracker = tracker + + def new_event_loop(self): + loop = self.__tracker.loop_factory(self.__tracker) + self.__tracker.register_call('policy.new_event_loop', loop) + return loop + + def set_event_loop(self, loop): + super().set_event_loop(loop) + self.__tracker.register_call('policy.set_event_loop', loop) + + def __init__(self): + self.__saved_loop_policy = asyncio.events._event_loop_policy + self.loop_factory = self.Loop + self.call_counter = collections.defaultdict(int) + self.loop_events = [] + + def install(self): + asyncio.set_event_loop_policy(self.Policy(self)) + + def uninstall(self): + # ALWAYS reset the event loop policy when we are done + asyncio.set_event_loop_policy(self.__saved_loop_policy) + + def register_call(self, call, obj): + self.call_counter[call] += 1 + self.loop_events.append((call, obj)) + + +class Test_AsyncioTestCase(unittest.TestCase): + def setUp(self): + super().setUp() + self.event_loop_tracking = EventLoopTracking() + + def tearDown(self): + super().tearDown() + self.event_loop_tracking.uninstall() + + def test_loop_created_per_test(self): + class TestCase(unittest.AsyncioTestCase): + async def test_one(self): + pass + + def test_two(self): + pass + + self.event_loop_tracking.install() + run_test_case(TestCase) + self.assertEqual( + 2, self.event_loop_tracking.call_counter['policy.new_event_loop']) + + def test_loop_policy_saved_and_restored(self): + with self.subTest('initial state'): + # first test with a initial policy of `None` to mimic + # the initial state + asyncio.set_event_loop_policy(None) + run_test_case(Test.SingleTestAsyncTestCase) + self.assertIsNone(asyncio.events._event_loop_policy) + + with self.subTest('custom policy'): + # next test with a custom policy + custom_policy = asyncio.DefaultEventLoopPolicy() + asyncio.set_event_loop_policy(custom_policy) + run_test_case(Test.SingleTestAsyncTestCase) + self.assertIs(custom_policy, asyncio.get_event_loop_policy()) + + def test_event_loop_closed_and_cleared(self): + self.event_loop_tracking.install() + run_test_case(Test.SingleTestAsyncTestCase) + self.assertEqual(1, + self.event_loop_tracking.call_counter['loop.close']) + self.assertNotEqual( + 0, self.event_loop_tracking.call_counter['policy.set_event_loop']) + + for event, loop in reversed(self.event_loop_tracking.loop_events): + if event == 'policy.set_event_loop': + self.assertIsNone( + loop, 'final call to set_event_loop should clear loop') + break + + def test_event_loop_stopped_when_test_fails(self): + class TestCase(unittest.AsyncioTestCase): + def test_one(self): + self.fail() + + self.event_loop_tracking.install() + run_test_case(TestCase, should_fail=True) + self.assertEqual( + self.event_loop_tracking.call_counter['loop.run_forever'], + self.event_loop_tracking.call_counter['loop.stop'], + ) + + def test_async_and_normal_cleanups_are_run(self): + class TestCase(Test.SingleTestAsyncTestCase): + state = {'async_called': False, 'normal_called': False} + + def setUp(self): + self.addCleanup(self.async_cleanup) + self.addCleanup(self.normal_cleanup) + + async def async_cleanup(self): + self.state['async_called'] = True + + def normal_cleanup(self): + self.state['normal_called'] = True + + run_test_case(TestCase) + self.assertTrue(TestCase.state['async_called'], + 'async cleanup not called') + self.assertTrue(TestCase.state['normal_called'], + 'normal cleanup not called') + + def test_test_methods_not_run_if_setup_fails(self): + class TestCase(unittest.AsyncioTestCase): + state = {'test_called': False} + + def setUp(self): + raise RuntimeError() + + def test_one(self): + self.state['test_called'] = True + + run_test_case(TestCase, should_fail=True) + self.assertFalse(TestCase.state['test_called'], + 'test method called unexpectedly') + + def test_that_teardown_run_on_test_failure(self): + class TestCase(unittest.AsyncioTestCase): + state = {'teardown_called': False} + + def tearDown(self): + super().tearDown() + self.state['teardown_called'] = True + + def test_one(self): + self.fail() + + run_test_case(TestCase, should_fail=True) + self.assertTrue(TestCase.state['teardown_called'], + 'tearDown not called') + + def test_that_teardown_not_run_on_setup_failure(self): + class TestCase(Test.SingleTestAsyncTestCase): + state = {'teardown_called': False} + + def setUp(self): + raise RuntimeError() + + def tearDown(self): + super().tearDown() + self.state['teardown_called'] = True + + run_test_case(TestCase, should_fail=True) + self.assertFalse(TestCase.state['teardown_called'], + 'tearDown called unexpectedly') + + def test_that_test_method_not_run_when_async_setup_failure(self): + class TestCase(unittest.AsyncioTestCase): + state = {'test_run': False} + + async def asyncSetUp(self): + await super().asyncSetUp() + raise RuntimeError() + + def test_one(self): + self.state['test_run'] = True + + run_test_case(TestCase, should_fail=True) + self.assertFalse(TestCase.state['test_run'], + 'test method called unexpectedly') + + def test_that_async_teardown_run_when_test_method_fails(self): + class TestCase(unittest.AsyncioTestCase): + state = {'teardown_called': False} + + async def asyncTearDown(self): + await super().asyncTearDown() + self.state['teardown_called'] = True + + def test_one(self): + self.fail() + + run_test_case(TestCase, should_fail=True) + self.assertTrue(TestCase.state['teardown_called'], + 'asyncTearDown method not called') + + def test_that_async_teardown_not_run_when_async_setup_failure(self): + class TestCase(Test.SingleTestAsyncTestCase): + state = {'teardown_called': False} + + async def asyncSetUp(self): + await super().asyncSetUp() + raise RuntimeError() + + async def asyncTearDown(self): + await super().asyncTearDown() + self.state['teardown_called'] = True + + run_test_case(TestCase, should_fail=True) + self.assertFalse(TestCase.state['teardown_called'], + 'asyncTearDown called unexpectedly') + + def test_that_do_cleanups_can_be_called_from_within_tests(self): + class TestCase(unittest.AsyncioTestCase): + async def test_case(self): + starting_loop = self.loop + self.doCleanups() + self.assertIs(starting_loop, self.loop, + 'doCleanups altered event loop') + + run_test_case(TestCase) + + def test_unclosed_async_generators(self): + # XXX - I believe that this tests the closure of asynchronous + # generators in AsyncioTestCase._terminateTest but I + # can't reliably catch it there in a debugger?! + async def gen(): + for n in range(0, 10): + yield n + + class TestCase(unittest.AsyncioTestCase): + async def test_case(self): + # Call the async generator without terminating + # it. This is what requires the call to + # shutdown_asyncgens() in _terminateTest(). + # See PEP-525 for details. + g = gen() + await g.asend(None) + + run_test_case(TestCase) + + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2018-11-04-08-01-54.bpo-32972.MLIb8q.rst b/Misc/NEWS.d/next/Library/2018-11-04-08-01-54.bpo-32972.MLIb8q.rst new file mode 100644 index 00000000000000..d2385259a838ce --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-11-04-08-01-54.bpo-32972.MLIb8q.rst @@ -0,0 +1,3 @@ +Add ``unittest.AsyncioTestCase`` for testing asyncio-based code. Inspired +by https://asynctest.readthedocs.io/en/latest/ and contributed by Dave +Shawley.