-
-
Notifications
You must be signed in to change notification settings - Fork 30.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bpo-32972: Add unittest.AsyncioTestCase #10296
Changes from 7 commits
564ada6
9a780c6
60d3514
6fa2911
231aab5
88a697c
9f6263e
694e6d5
adb4c96
3c0ac43
7eb7899
eed94cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
"""Test case implementation""" | ||
|
||
import asyncio | ||
import sys | ||
import functools | ||
import difflib | ||
|
@@ -81,6 +82,16 @@ def testPartExecutor(self, test_case, isTest=False): | |
self.success = self.success and old_success | ||
|
||
|
||
class _IgnoredOutcome(object): | ||
def __init__(self, result=None): | ||
self.expecting_failure = None | ||
self.success = True | ||
|
||
@contextlib.contextmanager | ||
def testPartExecutor(self, test_case, isTest=False): | ||
yield | ||
|
||
|
||
def _id(obj): | ||
return obj | ||
|
||
|
@@ -601,17 +612,7 @@ def run(self, result=None): | |
outcome = _Outcome(result) | ||
try: | ||
self._outcome = outcome | ||
|
||
with outcome.testPartExecutor(self): | ||
self.setUp() | ||
if outcome.success: | ||
outcome.expecting_failure = expecting_failure | ||
with outcome.testPartExecutor(self, isTest=True): | ||
testMethod() | ||
outcome.expecting_failure = False | ||
with outcome.testPartExecutor(self): | ||
self.tearDown() | ||
|
||
self._runTest(testMethod, outcome, expecting_failure) | ||
self.doCleanups() | ||
for test, reason in outcome.skipped: | ||
self._addSkip(result, test, reason) | ||
|
@@ -626,6 +627,7 @@ def run(self, result=None): | |
result.addSuccess(self) | ||
return result | ||
finally: | ||
self._terminateTest(outcome) | ||
result.stopTest(self) | ||
if orig_result is None: | ||
stopTestRun = getattr(result, 'stopTestRun', None) | ||
|
@@ -657,14 +659,27 @@ def doCleanups(self): | |
def __call__(self, *args, **kwds): | ||
return self.run(*args, **kwds) | ||
|
||
def _runTest(self, testMethod, outcome, expecting_failure): | ||
"""Run the test and collect errors into a TestResult""" | ||
with outcome.testPartExecutor(self): | ||
self.setUp() | ||
if outcome.success: | ||
outcome.expecting_failure = expecting_failure | ||
with outcome.testPartExecutor(self, isTest=True): | ||
testMethod() | ||
outcome.expecting_failure = False | ||
with outcome.testPartExecutor(self): | ||
self.tearDown() | ||
|
||
def _terminateTest(self, outcome): | ||
"""Hook that is called after a test run is complete.""" | ||
pass | ||
|
||
def debug(self): | ||
"""Run the test without collecting errors in a TestResult""" | ||
self.setUp() | ||
getattr(self, self._testMethodName)() | ||
self.tearDown() | ||
while self._cleanups: | ||
function, args, kwargs = self._cleanups.pop(-1) | ||
function(*args, **kwargs) | ||
self._runTest(getattr(self, self._testMethodName), | ||
_IgnoredOutcome(), None) | ||
self.doCleanups() | ||
|
||
def skipTest(self, reason): | ||
"""Skip this test.""" | ||
|
@@ -1439,3 +1454,150 @@ def shortDescription(self): | |
|
||
def __str__(self): | ||
return "{} {}".format(self.test_case, self._subDescription()) | ||
|
||
|
||
class AsyncioTestCase(TestCase): | ||
"""Extension of `unittest.TestCase` for concurrent test cases. | ||
|
||
This extension of `unittest.TestCase` runs the instance methods | ||
decorated with the **async**/**await** syntax on a event loop. | ||
The event loop is created anew for each test method run and | ||
destroyed when the test method has exited. Both the `setUp` and | ||
`tearDown` method are decorated with ``async``. Sub-classes that | ||
extend either method MUST call the parent method using the ``await`` | ||
keyword. | ||
|
||
If individual test methods are co-routines (as defined by | ||
``asyncio.iscoroutinefunction``), then they will be run on the | ||
active event loop; otherwise, they will be called as simple methods. | ||
In the latter case, the event loop IS NOT RUNNING; however, you | ||
can use it to run async code if necessary. | ||
|
||
Clean up functions will be run on the event loop if they are | ||
detected as co-routines; otherwise, they are called as standard | ||
functions. After ALL cleanup calls are completed, the event loop | ||
is stopped and closed. | ||
|
||
When subclassing AsyncioTestCase, the event loop is available | ||
via the ``event_loop`` property. Tests can assume that | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. about |
||
`self.event_loop` refers to the event loop created for the test | ||
case. It is also the active event loop as returned by | ||
``asyncio.get_event_loop()``. Test cases SHOULD NOT change the | ||
active event loop during the test run. | ||
|
||
The lifecycle of a single test execution is thus:: | ||
|
||
- ``TestCase`` instance is created | ||
- ``setUpClass`` is executed | ||
- for each test method | ||
- new event loop is created | ||
- ``setUp`` is executed on the event loop | ||
- if ``setUp`` succeeds | ||
- the test method is run on the event loop if it is a | ||
co-routine; otherwise it is simply called | ||
- ``tearDown`` is executed on the event loop | ||
- the event loop is stopped if it is somehow running | ||
- ``doCleanups`` is called | ||
- callables registered with ``addCleanup`` are called | ||
in reverse-order executing them on the event loop if | ||
necessary | ||
- the event loop is stopped if necessary | ||
- the event loop is closed and unregistered | ||
|
||
The process is tightly managed by the ``_runTest`` and ``doCleanups`` | ||
methods. Care must be taken if extending this class to ensure that | ||
the event loop is properly managed. | ||
|
||
""" | ||
|
||
def __init__(self, methodName='runTest'): | ||
super().__init__(methodName) | ||
self.__event_loop = None | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
"""Hook method for one-time initialization. | ||
|
||
Subclasses MUST invoke this method when extending it. | ||
""" | ||
super().setUpClass() | ||
cls.__saved_policy = asyncio.events._event_loop_policy | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
"""Hook method for one-time cleanup. | ||
|
||
Subclasses MUST invoke this method when extending it. | ||
""" | ||
super().tearDownClass() | ||
asyncio.set_event_loop_policy(cls.__saved_policy) | ||
|
||
async def asyncSetUp(self): | ||
"""Hook method for setting up the test fixture before exercising it. | ||
|
||
This method invokes ``setUp`` inline so subclasses MUST invoke this | ||
method using the ``await`` keyword when extending it. | ||
""" | ||
self.setUp() | ||
|
||
async def asyncTearDown(self): | ||
"""Hook method for deconstructing the test fixture after testing it. | ||
|
||
This method invokes ``tearDown`` inline so subclasses MUST invoke | ||
this method using the ``await`` keyword when extending it. | ||
""" | ||
self.tearDown() | ||
|
||
@property | ||
def event_loop(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the documentation, we prefer to use the Examples: maybe you should do the same. |
||
"""Active event loop for the test case.""" | ||
if self.__event_loop is None: | ||
self.__event_loop = asyncio.new_event_loop() | ||
self.__event_loop.set_debug(True) | ||
asyncio.set_event_loop(self.__event_loop) | ||
return self.__event_loop | ||
|
||
def _runTest(self, testMethod, outcome, expecting_failure): | ||
try: | ||
with outcome.testPartExecutor(self): | ||
self.event_loop.run_until_complete(self.asyncSetUp()) | ||
if outcome.success: | ||
outcome.expecting_failure = expecting_failure | ||
with outcome.testPartExecutor(self, isTest=True): | ||
if asyncio.iscoroutinefunction(testMethod): | ||
self.event_loop.run_until_complete(testMethod()) | ||
else: | ||
testMethod() | ||
outcome.expecting_failure = False | ||
with outcome.testPartExecutor(self): | ||
self.event_loop.run_until_complete(self.asyncTearDown()) | ||
finally: | ||
if self.event_loop.is_running(): | ||
self.event_loop.stop() | ||
|
||
def _terminateTest(self, outcome): | ||
super()._terminateTest(outcome) | ||
if self.event_loop.is_running(): | ||
self.event_loop.stop() | ||
self.event_loop.run_until_complete( | ||
self.event_loop.shutdown_asyncgens()) | ||
asyncio.set_event_loop(None) | ||
if not self.event_loop.is_closed(): | ||
self.event_loop.close() | ||
self.__event_loop = None | ||
|
||
def doCleanups(self): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docstring should start on the first line, https://www.python.org/dev/peps/pep-0008/#documentation-strings |
||
Execute all cleanup functions. | ||
|
||
Normally called for you after tearDown. | ||
""" | ||
outcome = self._outcome or _Outcome() | ||
while self._cleanups: | ||
cleanup_func, args, kwargs = self._cleanups.pop() | ||
with outcome.testPartExecutor(self): | ||
if asyncio.iscoroutinefunction(cleanup_func): | ||
self.event_loop.run_until_complete( | ||
cleanup_func(*args, **kwargs)) | ||
else: | ||
cleanup_func(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python3, we prefer to use the new style
https://docs.python.org/3/reference/compound_stmts.html#class-definitions
is equivalent to
so in this case, use the new style and not the old one.