|
1 | 1 | """Test the interactive interpreter.""" |
2 | 2 |
|
| 3 | +from io import BytesIO |
3 | 4 | import os |
| 5 | +from re import sub |
4 | 6 | import subprocess |
| 7 | +import tempfile |
5 | 8 | import sys |
6 | 9 | import unittest |
7 | 10 | from textwrap import dedent |
8 | 11 | from test import support |
9 | | -from test.support import cpython_only, has_subprocess_support, SuppressCrashReport |
| 12 | +from test.support import ( |
| 13 | + cpython_only, |
| 14 | + has_subprocess_support, |
| 15 | + is_android, |
| 16 | + is_apple_mobile, |
| 17 | + is_emscripten, |
| 18 | + is_wasi, |
| 19 | + SuppressCrashReport, |
| 20 | + reap_children, |
| 21 | +) |
10 | 22 | from test.support.script_helper import assert_python_failure, kill_python, assert_python_ok |
11 | 23 | from test.support.import_helper import import_module |
12 | 24 |
|
@@ -198,6 +210,55 @@ def bar(x): |
198 | 210 | def test_asyncio_repl_no_tty_fails(self): |
199 | 211 | assert assert_python_failure("-m", "asyncio") |
200 | 212 |
|
| 213 | + def test_asyncio_repl_reaches_python_startup_script(self): |
| 214 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 215 | + script = os.path.join(tmpdir, "script.py") |
| 216 | + with open(script, "w") as f: |
| 217 | + f.write("exit(0)") |
| 218 | + |
| 219 | + subprocess.check_call( |
| 220 | + [sys.executable, "-m", "asyncio"], |
| 221 | + stdout=subprocess.PIPE, |
| 222 | + stderr=subprocess.PIPE, |
| 223 | + env={"PYTHONSTARTUP": script} |
| 224 | + ) |
| 225 | + |
| 226 | + def test_asyncio_repl_is_ok(self): |
| 227 | + # The REPL expects a tty to go into the interactive mode, so let's |
| 228 | + # simulate one. This test is limited to Linux since |
| 229 | + # it uses the pty module. |
| 230 | + try: |
| 231 | + import_module('termios') |
| 232 | + if is_android or is_apple_mobile or is_emscripten or is_wasi: |
| 233 | + raise unittest.SkipTest("pty is not available on this platform") |
| 234 | + |
| 235 | + import pty |
| 236 | + |
| 237 | + m, s = pty.openpty() |
| 238 | + try: |
| 239 | + with open(s, "rb") as proc_file, open(m, "w") as input_file: |
| 240 | + proc = subprocess.Popen( |
| 241 | + [ |
| 242 | + sys.executable, |
| 243 | + "-m", |
| 244 | + "asyncio", |
| 245 | + ], |
| 246 | + stdin=proc_file, |
| 247 | + stdout=proc_file, |
| 248 | + stderr=proc_file, |
| 249 | + ) |
| 250 | + input_file.write("exit()\n") |
| 251 | + proc.wait() |
| 252 | + finally: |
| 253 | + for fd in [m, s]: |
| 254 | + try: |
| 255 | + os.close(fd) |
| 256 | + except OSError: |
| 257 | + pass |
| 258 | + |
| 259 | + self.assertEqual(proc.returncode, 0) |
| 260 | + finally: |
| 261 | + reap_children() |
201 | 262 |
|
202 | 263 | class TestInteractiveModeSyntaxErrors(unittest.TestCase): |
203 | 264 |
|
|
0 commit comments