Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-85984: Utilize new "winsize" functions from termios in pty tests. #101831

Merged
merged 5 commits into from
Feb 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 27 additions & 69 deletions Lib/test/test_pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

# Skip these tests if termios or fcntl are not available
import_module('termios')
# fcntl is a proxy for not being one of the wasm32 platforms even though we
# don't use this module... a proper check for what crashes those is needed.
import_module("fcntl")

import errno
Expand All @@ -15,20 +17,12 @@
import socket
import io # readline
import unittest

import struct
import fcntl
gpshead marked this conversation as resolved.
Show resolved Hide resolved
import warnings

TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = b"For my pet fish, Eric.\n"

try:
_TIOCGWINSZ = tty.TIOCGWINSZ
_TIOCSWINSZ = tty.TIOCSWINSZ
_HAVE_WINSZ = True
except AttributeError:
_HAVE_WINSZ = False
_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ")

if verbose:
def debug(msg):
Expand Down Expand Up @@ -82,14 +76,6 @@ def expectedFailureIfStdinIsTTY(fun):
pass
return fun

def _get_term_winsz(fd):
s = struct.pack("HHHH", 0, 0, 0, 0)
return fcntl.ioctl(fd, _TIOCGWINSZ, s)

def _set_term_winsz(fd, winsz):
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)


# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
# because pty code is not too portable.
class PtyTest(unittest.TestCase):
Expand All @@ -105,18 +91,14 @@ def setUp(self):
self.addCleanup(signal.alarm, 0)
signal.alarm(10)

# Save original stdin window size
self.stdin_rows = None
self.stdin_cols = None
# Save original stdin window size.
self.stdin_dim = None
if _HAVE_WINSZ:
try:
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
self.stdin_rows = stdin_dim.lines
self.stdin_cols = stdin_dim.columns
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
self.stdin_cols, 0, 0)
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
except OSError:
self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO)
self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO,
self.stdin_dim)
except tty.error:
pass

def handle_sig(self, sig, frame):
Expand All @@ -131,41 +113,40 @@ def test_openpty(self):
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
except tty.error:
# not a tty or bad/closed fd
# Not a tty or bad/closed fd.
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
mode = None

new_stdin_winsz = None
if self.stdin_rows is not None and self.stdin_cols is not None:
new_dim = None
if self.stdin_dim:
try:
# Modify pty.STDIN_FILENO window size; we need to
# check if pty.openpty() is able to set pty slave
# window size accordingly.
debug("Setting pty.STDIN_FILENO window size")
debug(f"original size: (rows={self.stdin_rows}, cols={self.stdin_cols})")
target_stdin_rows = self.stdin_rows + 1
target_stdin_cols = self.stdin_cols + 1
debug(f"target size: (rows={target_stdin_rows}, cols={target_stdin_cols})")
target_stdin_winsz = struct.pack("HHHH", target_stdin_rows,
target_stdin_cols, 0, 0)
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)
debug("Setting pty.STDIN_FILENO window size.")
debug(f"original size: (row, col) = {self.stdin_dim}")
target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1)
debug(f"target size: (row, col) = {target_dim}")
tty.tcsetwinsize(pty.STDIN_FILENO, target_dim)

# Were we able to set the window size
# of pty.STDIN_FILENO successfully?
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
new_dim = tty.tcgetwinsize(pty.STDIN_FILENO)
self.assertEqual(new_dim, target_dim,
"pty.STDIN_FILENO window size unchanged")
except OSError:
warnings.warn("Failed to set pty.STDIN_FILENO window size")
warnings.warn("Failed to set pty.STDIN_FILENO window size.")
pass

try:
debug("Calling pty.openpty()")
try:
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim,
True)
except TypeError:
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
slave_name = None
debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}")
except OSError:
# " An optional feature could not be imported " ... ?
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
Expand All @@ -181,8 +162,8 @@ def test_openpty(self):
if mode:
self.assertEqual(tty.tcgetattr(slave_fd), mode,
"openpty() failed to set slave termios")
if new_stdin_winsz:
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
if new_dim:
self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim,
"openpty() failed to set slave window size")

# Ensure the fd is non-blocking in case there's nothing to read.
Expand Down Expand Up @@ -367,9 +348,8 @@ def _socketpair(self):
self.files.extend(socketpair)
return socketpair

def _mock_select(self, rfds, wfds, xfds, timeout=0):
def _mock_select(self, rfds, wfds, xfds):
# This will raise IndexError when no more expected calls exist.
# This ignores the timeout
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
return self.select_rfds_results.pop(0), [], []

Expand Down Expand Up @@ -409,28 +389,6 @@ def test__copy_to_each(self):
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')

def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]

socketpair[1].close()
os.close(write_to_stdin_fd)

pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)

# We expect the function to return without error.
self.assertEqual(pty._copy(masters[0]), None)

def test__restore_tty_mode_normal_return(self):
"""Test that spawn resets the tty mode no when _copy returns normally."""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Utilize new "winsize" functions from termios in pty tests.