|
8 | 8 | import importlib.util
|
9 | 9 | import os
|
10 | 10 | import pathlib
|
| 11 | +import re |
11 | 12 | import string
|
12 | 13 | import sys
|
13 | 14 | from test import support
|
| 15 | +import textwrap |
14 | 16 | import types
|
15 | 17 | import unittest
|
16 | 18 | import unittest.mock
|
17 | 19 | import warnings
|
18 | 20 |
|
| 21 | +try: |
| 22 | + import _testsinglephase |
| 23 | +except ImportError: |
| 24 | + _testsinglephase = None |
| 25 | +try: |
| 26 | + import _testmultiphase |
| 27 | +except ImportError: |
| 28 | + _testmultiphase = None |
| 29 | +try: |
| 30 | + import _xxsubinterpreters as _interpreters |
| 31 | +except ModuleNotFoundError: |
| 32 | + _interpreters = None |
| 33 | + |
19 | 34 |
|
20 | 35 | class DecodeSourceBytesTests:
|
21 | 36 |
|
@@ -637,5 +652,111 @@ def test_magic_number(self):
|
637 | 652 | self.assertEqual(EXPECTED_MAGIC_NUMBER, actual, msg)
|
638 | 653 |
|
639 | 654 |
|
| 655 | +@unittest.skipIf(_interpreters is None, 'subinterpreters required') |
| 656 | +class AllowingAllExtensionsTests(unittest.TestCase): |
| 657 | + |
| 658 | + ERROR = re.compile("^<class 'ImportError'>: module (.*) does not support loading in subinterpreters") |
| 659 | + |
| 660 | + def run_with_own_gil(self, script): |
| 661 | + interpid = _interpreters.create(isolated=True) |
| 662 | + try: |
| 663 | + _interpreters.run_string(interpid, script) |
| 664 | + except _interpreters.RunFailedError as exc: |
| 665 | + if m := self.ERROR.match(str(exc)): |
| 666 | + modname, = m.groups() |
| 667 | + raise ImportError(modname) |
| 668 | + |
| 669 | + def run_with_shared_gil(self, script): |
| 670 | + interpid = _interpreters.create(isolated=False) |
| 671 | + try: |
| 672 | + _interpreters.run_string(interpid, script) |
| 673 | + except _interpreters.RunFailedError as exc: |
| 674 | + if m := self.ERROR.match(str(exc)): |
| 675 | + modname, = m.groups() |
| 676 | + raise ImportError(modname) |
| 677 | + |
| 678 | + @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") |
| 679 | + def test_single_phase_init_module(self): |
| 680 | + script = textwrap.dedent(''' |
| 681 | + import importlib.util |
| 682 | + with importlib.util.allowing_all_extensions(): |
| 683 | + import _testsinglephase |
| 684 | + ''') |
| 685 | + with self.subTest('check disabled, shared GIL'): |
| 686 | + self.run_with_shared_gil(script) |
| 687 | + with self.subTest('check disabled, per-interpreter GIL'): |
| 688 | + self.run_with_own_gil(script) |
| 689 | + |
| 690 | + script = textwrap.dedent(f''' |
| 691 | + import importlib.util |
| 692 | + with importlib.util.allowing_all_extensions(False): |
| 693 | + import _testsinglephase |
| 694 | + ''') |
| 695 | + with self.subTest('check enabled, shared GIL'): |
| 696 | + with self.assertRaises(ImportError): |
| 697 | + self.run_with_shared_gil(script) |
| 698 | + with self.subTest('check enabled, per-interpreter GIL'): |
| 699 | + with self.assertRaises(ImportError): |
| 700 | + self.run_with_own_gil(script) |
| 701 | + |
| 702 | + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") |
| 703 | + def test_incomplete_multi_phase_init_module(self): |
| 704 | + prescript = textwrap.dedent(f''' |
| 705 | + from importlib.util import spec_from_loader, module_from_spec |
| 706 | + from importlib.machinery import ExtensionFileLoader |
| 707 | +
|
| 708 | + name = '_test_shared_gil_only' |
| 709 | + filename = {_testmultiphase.__file__!r} |
| 710 | + loader = ExtensionFileLoader(name, filename) |
| 711 | + spec = spec_from_loader(name, loader) |
| 712 | +
|
| 713 | + ''') |
| 714 | + |
| 715 | + script = prescript + textwrap.dedent(''' |
| 716 | + import importlib.util |
| 717 | + with importlib.util.allowing_all_extensions(): |
| 718 | + module = module_from_spec(spec) |
| 719 | + loader.exec_module(module) |
| 720 | + ''') |
| 721 | + with self.subTest('check disabled, shared GIL'): |
| 722 | + self.run_with_shared_gil(script) |
| 723 | + with self.subTest('check disabled, per-interpreter GIL'): |
| 724 | + self.run_with_own_gil(script) |
| 725 | + |
| 726 | + script = prescript + textwrap.dedent(''' |
| 727 | + import importlib.util |
| 728 | + with importlib.util.allowing_all_extensions(False): |
| 729 | + module = module_from_spec(spec) |
| 730 | + loader.exec_module(module) |
| 731 | + ''') |
| 732 | + with self.subTest('check enabled, shared GIL'): |
| 733 | + self.run_with_shared_gil(script) |
| 734 | + with self.subTest('check enabled, per-interpreter GIL'): |
| 735 | + with self.assertRaises(ImportError): |
| 736 | + self.run_with_own_gil(script) |
| 737 | + |
| 738 | + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") |
| 739 | + def test_complete_multi_phase_init_module(self): |
| 740 | + script = textwrap.dedent(''' |
| 741 | + import importlib.util |
| 742 | + with importlib.util.allowing_all_extensions(): |
| 743 | + import _testmultiphase |
| 744 | + ''') |
| 745 | + with self.subTest('check disabled, shared GIL'): |
| 746 | + self.run_with_shared_gil(script) |
| 747 | + with self.subTest('check disabled, per-interpreter GIL'): |
| 748 | + self.run_with_own_gil(script) |
| 749 | + |
| 750 | + script = textwrap.dedent(f''' |
| 751 | + import importlib.util |
| 752 | + with importlib.util.allowing_all_extensions(False): |
| 753 | + import _testmultiphase |
| 754 | + ''') |
| 755 | + with self.subTest('check enabled, shared GIL'): |
| 756 | + self.run_with_shared_gil(script) |
| 757 | + with self.subTest('check enabled, per-interpreter GIL'): |
| 758 | + self.run_with_own_gil(script) |
| 759 | + |
| 760 | + |
640 | 761 | if __name__ == '__main__':
|
641 | 762 | unittest.main()
|
0 commit comments