Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32604: [_xxsubinterpreters] Propagate exceptions. #19768

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 299 additions & 2 deletions Lib/test/test__xxsubinterpreters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import builtins
from collections import namedtuple
import contextlib
import itertools
Expand Down Expand Up @@ -885,10 +886,11 @@ def assert_run_failed(self, exctype, msg=None):
yield
if msg is None:
self.assertEqual(str(caught.exception).split(':')[0],
str(exctype))
exctype.__name__)
else:
self.assertEqual(str(caught.exception),
"{}: {}".format(exctype, msg))
"{}: {}".format(exctype.__name__, msg))
self.assertIsInstance(caught.exception.__cause__, exctype)

def test_invalid_syntax(self):
with self.assert_run_failed(SyntaxError):
Expand Down Expand Up @@ -1079,6 +1081,301 @@ def f():
self.assertEqual(retcode, 0)


def build_exception(exctype, /, *args, **kwargs):
# XXX Use __qualname__?
name = exctype.__name__
argreprs = [repr(a) for a in args]
if kwargs:
kwargreprs = [f'{k}={v!r}' for k, v in kwargs.items()]
script = f'{name}({", ".join(argreprs)}, {", ".join(kwargreprs)})'
else:
script = f'{name}({", ".join(argreprs)})'
expected = exctype(*args, **kwargs)
return script, expected


def build_exceptions(self, *exctypes, default=None, custom=None, bases=True):
if not exctypes:
raise NotImplementedError
if not default:
default = ((), {})
elif isinstance(default, str):
default = ((default,), {})
elif type(default) is not tuple:
raise NotImplementedError
elif len(default) != 2:
default = (default, {})
elif type(default[0]) is not tuple:
default = (default, {})
elif type(default[1]) is not dict:
default = (default, {})
# else leave it alone

for exctype in exctypes:
customtype = None
values = default
if custom:
if exctype in custom:
customtype = exctype
elif bases:
for customtype in custom:
if issubclass(exctype, customtype):
break
else:
customtype = None
if customtype is not None:
values = custom[customtype]
if values is None:
continue
args, kwargs = values
script, expected = build_exception(exctype, *args, **kwargs)
yield exctype, customtype, script, expected


try:
raise Exception
except Exception as exc:
assert exc.__traceback__ is not None
Traceback = type(exc.__traceback__)


class RunFailedTests(TestBase):

BUILTINS = [v
for v in vars(builtins).values()
if (type(v) is type
and issubclass(v, Exception)
#and issubclass(v, BaseException)
)
]
BUILTINS_SPECIAL = [
# These all have extra attributes (i.e. args/kwargs)
SyntaxError,
ImportError,
UnicodeError,
OSError,
SystemExit,
StopIteration,
]

@classmethod
def build_exceptions(cls, exctypes=None, default=(), custom=None):
if exctypes is None:
exctypes = cls.BUILTINS
if custom is None:
# Skip the "special" ones.
custom = {et: None for et in cls.BUILTINS_SPECIAL}
yield from build_exceptions(*exctypes, default=default, custom=custom)

def assertExceptionsEqual(self, exc, expected, *, chained=True):
if type(expected) is type:
self.assertIs(type(exc), expected)
return
elif not isinstance(exc, Exception):
self.assertEqual(exc, expected)
elif not isinstance(expected, Exception):
self.assertEqual(exc, expected)
else:
# Plain equality doesn't work, so we have to compare manually.
self.assertIs(type(exc), type(expected))
self.assertEqual(exc.args, expected.args)
self.assertEqual(exc.__reduce__(), expected.__reduce__())
if chained:
self.assertExceptionsEqual(exc.__context__,
expected.__context__)
self.assertExceptionsEqual(exc.__cause__,
expected.__cause__)
self.assertEqual(exc.__suppress_context__,
expected.__suppress_context__)

def assertTracebacksEqual(self, tb, expected):
if not isinstance(tb, Traceback):
self.assertEqual(tb, expected)
elif not isinstance(expected, Traceback):
self.assertEqual(tb, expected)
else:
self.assertEqual(tb.tb_frame.f_code.co_name,
expected.tb_frame.f_code.co_name)
self.assertEqual(tb.tb_frame.f_code.co_filename,
expected.tb_frame.f_code.co_filename)
self.assertEqual(tb.tb_lineno, expected.tb_lineno)
self.assertTracebacksEqual(tb.tb_next, expected.tb_next)

# XXX Move this to TestBase?
@contextlib.contextmanager
def expected_run_failure(self, expected):
exctype = expected if type(expected) is type else type(expected)

with self.assertRaises(interpreters.RunFailedError) as caught:
yield caught
exc = caught.exception

modname = exctype.__module__
if modname == 'builtins' or modname == '__main__':
exctypename = exctype.__name__
else:
exctypename = f'{modname}.{exctype.__name__}'
if exctype is expected:
self.assertEqual(str(exc).split(':')[0], exctypename)
else:
self.assertEqual(str(exc), f'{exctypename}: {expected}')
self.assertExceptionsEqual(exc.__cause__, expected)
if exc.__cause__ is not None:
self.assertIsNotNone(exc.__cause__.__traceback__)

def test_builtin_exceptions(self):
interpid = interpreters.create()
msg = '<a message>'
for i, info in enumerate(self.build_exceptions(
default=msg,
custom={
SyntaxError: ((msg, '<stdin>', 1, 3, 'a +?'), {}),
ImportError: ((msg,), {'name': 'spam', 'path': '/x/spam.py'}),
UnicodeError: None,
#UnicodeError: ((), {}),
#OSError: ((), {}),
SystemExit: ((1,), {}),
StopIteration: (('<a value>',), {}),
},
)):
exctype, _, script, expected = info
testname = f'{i+1} - {script}'
script = f'raise {script}'

with self.subTest(testname):
with self.expected_run_failure(expected):
interpreters.run_string(interpid, script)

def test_custom_exception_from___main__(self):
script = dedent("""
class SpamError(Exception):
def __init__(self, q):
super().__init__(f'got {q}')
self.q = q
raise SpamError('eggs')
""")
expected = Exception(f'SpamError: got {"eggs"}')

interpid = interpreters.create()
with self.assertRaises(interpreters.RunFailedError) as caught:
interpreters.run_string(interpid, script)
cause = caught.exception.__cause__

self.assertExceptionsEqual(cause, expected)

class SpamError(Exception):
# The normal Exception.__reduce__() produces a funny result
# here. So we have to use a custom __new__().
def __new__(cls, q):
if type(q) is SpamError:
return q
return super().__new__(cls, q)
def __init__(self, q):
super().__init__(f'got {q}')
self.q = q

def test_custom_exception(self):
script = dedent("""
import test.test__xxsubinterpreters
SpamError = test.test__xxsubinterpreters.RunFailedTests.SpamError
raise SpamError('eggs')
""")
try:
ns = {}
exec(script, ns, ns)
except Exception as exc:
expected = exc

interpid = interpreters.create()
with self.expected_run_failure(expected):
interpreters.run_string(interpid, script)

class SpamReducedError(Exception):
def __init__(self, q):
super().__init__(f'got {q}')
self.q = q
def __reduce__(self):
return (type(self), (self.q,), {})

def test_custom___reduce__(self):
script = dedent("""
import test.test__xxsubinterpreters
SpamError = test.test__xxsubinterpreters.RunFailedTests.SpamReducedError
raise SpamError('eggs')
""")
try:
exec(script, (ns := {'__name__': '__main__'}), ns)
except Exception as exc:
expected = exc

interpid = interpreters.create()
with self.expected_run_failure(expected):
interpreters.run_string(interpid, script)

def test_traceback_propagated(self):
script = dedent("""
def do_spam():
raise Exception('uh-oh')
def do_eggs():
return do_spam()
class Spam:
def do(self):
return do_eggs()
def get_handler():
def handler():
return Spam().do()
return handler
go = (lambda: get_handler()())
def iter_all():
yield from (go() for _ in [True])
yield None
def main():
for v in iter_all():
pass
main()
""")
try:
ns = {}
exec(script, ns, ns)
except Exception as exc:
expected = exc
expectedtb = exc.__traceback__.tb_next

interpid = interpreters.create()
with self.expected_run_failure(expected) as caught:
interpreters.run_string(interpid, script)
exc = caught.exception

self.assertTracebacksEqual(exc.__cause__.__traceback__,
expectedtb)

def test_chained_exceptions(self):
script = dedent("""
try:
raise ValueError('msg 1')
except Exception as exc1:
try:
raise TypeError('msg 2')
except Exception as exc2:
try:
raise IndexError('msg 3') from exc2
except Exception:
raise AttributeError('msg 4')
""")
try:
exec(script, {}, {})
except Exception as exc:
expected = exc

interpid = interpreters.create()
with self.expected_run_failure(expected) as caught:
interpreters.run_string(interpid, script)
exc = caught.exception

# ...just to be sure.
self.assertIs(type(exc.__cause__), AttributeError)


##################################
# channel tests

Expand Down
Loading