Skip to content
This repository has been archived by the owner on Nov 29, 2022. It is now read-only.

Commit

Permalink
Fixes #80: stdin exposed the parent tty to the spawned program
Browse files Browse the repository at this point in the history
  • Loading branch information
hSaria committed Dec 28, 2021
1 parent f2ee239 commit ad02ea5
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 45 deletions.
79 changes: 52 additions & 27 deletions chromaterm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,39 +287,44 @@ def read_ready(*read_fds, timeout=None):

def run_program(program_args):
"""Fork a program over a pty of which the master fd is returned."""
import atexit
import fcntl
import termios
import pty
import shutil
import struct
import tty

# Create the pty file decriptors
master_fd, slave_fd = os.openpty()

# Update terminal size and attributes on the program's (slave) pty
# Save the current tty's window size and attributes (used by slave pty)
window_size = shutil.get_terminal_size()
window_size = struct.pack('2H', window_size.lines, window_size.columns)
fcntl.ioctl(slave_fd, termios.TIOCSWINSZ, window_size)

try:
attributes = termios.tcgetattr(sys.stdin.fileno())
termios.tcsetattr(slave_fd, termios.TCSANOW, attributes)

# Set to raw as the pty will be handling any processing
tty.setraw(sys.stdin.fileno())
atexit.register(termios.tcsetattr, sys.stdin.fileno(), termios.TCSANOW,
attributes)
except termios.error:
pass
attributes = None

pid, master_fd = pty.fork() # openpty, login_tty, then fork

if os.fork() == 0: # Program
os.dup2(slave_fd, sys.stdout.fileno())
os.dup2(slave_fd, sys.stderr.fileno())
os.close(master_fd)
os.close(slave_fd)
if pid == 0: # Program
# Update the slave's pty (now on std fds) window size and attributes
fcntl.ioctl(sys.stdin.fileno(), termios.TIOCSWINSZ, window_size)

if attributes:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, attributes)

try:
os.execvp(program_args[0], program_args)
except FileNotFoundError:
eprint(program_args[0] + ': command not found')

sys.exit()
sys.exit(1) # Shouldn't be hit as exec replaces the fork's process
else: # CT
os.close(slave_fd)
return master_fd


Expand Down Expand Up @@ -362,22 +367,42 @@ def main(config, max_wait=None, read_fd=None):
`max_wait` is the longest period to wait without input before returning.
read_fd will default to stdin if not specified. If config['read_fd'] is set,
the read_fd keyword is ignored."""
if read_fd is None:
read_fd = sys.stdin.fileno()
# data piped over stdin
if config.get('read_fd', read_fd) in [None, sys.stdin.fileno()]:
config['read_fd'] = sys.stdin.fileno()
fds = [config['read_fd']]
# data coming from pty, stdin is forwarded to the pty
else:
config['read_fd'] = config.get('read_fd', read_fd)
fds = [sys.stdin.fileno(), config['read_fd']]

buffer = ''
config['read_fd'] = config.get('read_fd', read_fd)
ready_fds = read_ready(*fds, timeout=max_wait)

while read_ready(config['read_fd'], timeout=max_wait):
try:
data = os.read(config['read_fd'], READ_SIZE)
except OSError:
data = b''
while ready_fds:
# stdin has data (piped) or has input to forward to the pty
if sys.stdin.fileno() in ready_fds:
data = os.read(sys.stdin.fileno(), READ_SIZE)

buffer += data.decode(encoding='utf-8', errors='replace')
if sys.stdin.fileno() == config['read_fd']: # stdin is data
buffer += data.decode(encoding='utf-8', errors='replace')
else: # stdin is forwarded to pty
os.write(config['read_fd'], data)

if not buffer: # Buffer was processed empty and data fd hit EOF
break
if not data: # stdin or pty closed; don't forward anymore
fds.remove(sys.stdin.fileno())

# Data was received. If read_fd is stdin, then just process the buffer
# as the data was already read above. Otherwise, read_fd is pty.
if config['read_fd'] in ready_fds:
if config['read_fd'] != sys.stdin.fileno():
data = os.read(config['read_fd'], READ_SIZE)
buffer += data.decode(encoding='utf-8', errors='replace')

if not buffer: # Buffer was processed empty and data fd hit EOF
break

# Process the buffer, updating it with any left-over data
buffer = process_buffer(config, buffer, bool(data))

# Process the buffer, updating it with any left-over data
buffer = process_buffer(config, buffer, bool(data))
ready_fds = read_ready(*fds, timeout=max_wait)
97 changes: 79 additions & 18 deletions chromaterm/test___init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import signal
import subprocess
import sys
import time
from threading import Thread

Expand All @@ -18,11 +19,25 @@

TEMP_FILE = '.test_chromaterm.yml'

TTY_CODE = """import os, sys
t_stdin = os.isatty(sys.stdin.fileno())
t_stdout = os.isatty(sys.stdout.fileno())
print('stdin={}, stdout={}'.format(t_stdin, t_stdout))"""
TTY_PROGRAM = 'python3 -c "{}"'.format('; '.join(TTY_CODE.splitlines()))
CODE_ISATTY = """import os, sys
stdin = os.isatty(sys.stdin.fileno())
stdout = os.isatty(sys.stdout.fileno())
print('stdin={}, stdout={}'.format(stdin, stdout))"""

CODE_TTYNAME = """import os, sys
print(os.ttyname(sys.stdin.fileno()) if os.isatty(sys.stdin.fileno()) else None)"""

_, MOCK_FD = os.openpty()


def mock_fd_fileno():
"""Returns MOCK_FD. To be used when monkeypatching sys.stdin.fileno"""
return MOCK_FD


def get_python_command(code):
"""Returns the python shell command that runs `code`."""
return 'python3 -c "{}"'.format('; '.join(code.splitlines()))


def test_decode_sgr_bg():
Expand Down Expand Up @@ -1055,7 +1070,7 @@ def test_split_buffer_ecma_048_osc_title():
def test_tty_test_code_no_pipe():
"""Baseline the test code with no pipes on stdin or stdout."""
master, slave = os.openpty()
subprocess.run(TTY_PROGRAM,
subprocess.run(get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=slave,
Expand All @@ -1066,7 +1081,7 @@ def test_tty_test_code_no_pipe():
def test_tty_test_code_in_pipe():
"""Baseline the test code with a pipe on stdin."""
master, slave = os.openpty()
subprocess.run(TTY_PROGRAM,
subprocess.run(get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=subprocess.PIPE,
Expand All @@ -1077,7 +1092,7 @@ def test_tty_test_code_in_pipe():
def test_tty_test_code_out_pipe():
"""Baseline the test code with a pipe on stdout."""
_, slave = os.openpty()
result = subprocess.run(TTY_PROGRAM,
result = subprocess.run(get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=slave,
Expand All @@ -1087,18 +1102,47 @@ def test_tty_test_code_out_pipe():

def test_tty_test_code_in_out_pipe():
"""Baseline the test code with pipes on stdin and stdout."""
result = subprocess.run(TTY_PROGRAM,
result = subprocess.run(get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
assert 'stdin=False, stdout=False' in result.stdout.decode()


def test_main(capsys):
def test_tty_test_code_ttyname_same():
"""Baseline the ttyname code, ensuring it detects matching ttys."""
master, slave = os.openpty()

subprocess.run(get_python_command(CODE_TTYNAME),
check=True,
shell=True,
stdin=slave,
stdout=slave)

assert os.ttyname(slave) in os.read(master, 1024).decode()


def test_tty_test_code_ttyname_different():
"""Baseline the ttyname code, ensuring it detects different ttys."""
master, slave = os.openpty()
another_master, another_slave = os.openpty()

subprocess.run(get_python_command(CODE_TTYNAME),
check=True,
shell=True,
stdin=slave,
stdout=slave)

assert os.ttyname(another_slave) not in os.read(master, 1024).decode()


def test_main(capsys, monkeypatch):
"""General stdin processing."""
try:
pipe_r, pipe_w = os.pipe()
monkeypatch.setattr(sys.stdin, 'fileno', mock_fd_fileno)

config = chromaterm.args_init([])
main_thread = Thread(target=chromaterm.main, args=(config, 1, pipe_r))

Expand Down Expand Up @@ -1142,10 +1186,12 @@ def test_main_buffer_close_time():
assert after - before < 1


def test_main_decode_error(capsys):
def test_main_decode_error(capsys, monkeypatch):
"""Attempt to decode a character that is not UTF-8."""
try:
pipe_r, pipe_w = os.pipe()
monkeypatch.setattr(sys.stdin, 'fileno', mock_fd_fileno)

config = chromaterm.args_init([])
main_thread = Thread(target=chromaterm.main, args=(config, 1, pipe_r))

Expand All @@ -1162,7 +1208,7 @@ def test_main_decode_error(capsys):
main_thread.join()


def test_main_reload_config(capsys):
def test_main_reload_config(capsys, monkeypatch):
"""Reload the configuration while the program is running."""
try:
with open(TEMP_FILE + '1', 'w') as file:
Expand All @@ -1173,6 +1219,8 @@ def test_main_reload_config(capsys):
color: b#321321''')

pipe_r, pipe_w = os.pipe()
monkeypatch.setattr(sys.stdin, 'fileno', mock_fd_fileno)

config = chromaterm.args_init(['--config', TEMP_FILE + '1'])
main_thread = Thread(target=chromaterm.main, args=(config, 1, pipe_r))

Expand Down Expand Up @@ -1234,7 +1282,7 @@ def test_main_run_no_file_found():
def test_main_run_no_pipe():
"""Have CT run the tty test code with no pipes."""
master, slave = os.openpty()
subprocess.run('./ct ' + TTY_PROGRAM,
subprocess.run('./ct ' + get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=slave,
Expand All @@ -1245,18 +1293,18 @@ def test_main_run_no_pipe():
def test_main_run_in_pipe():
"""Have CT run the tty test code with a pipe on stdin."""
master, slave = os.openpty()
subprocess.run('./ct ' + TTY_PROGRAM,
subprocess.run('./ct ' + get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=subprocess.PIPE,
stdout=slave)
assert 'stdin=False, stdout=True' in os.read(master, 1024).decode()
assert 'stdin=True, stdout=True' in os.read(master, 1024).decode()


def test_main_run_out_pipe():
"""Have CT run the tty test code with a pipe on stdout."""
_, slave = os.openpty()
result = subprocess.run('./ct ' + TTY_PROGRAM,
result = subprocess.run('./ct ' + get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=slave,
Expand All @@ -1266,9 +1314,22 @@ def test_main_run_out_pipe():

def test_main_run_in_out_pipe():
"""Have CT run the tty test code with pipes on stdin and stdout."""
result = subprocess.run('./ct ' + TTY_PROGRAM,
result = subprocess.run('./ct ' + get_python_command(CODE_ISATTY),
check=True,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
assert 'stdin=False, stdout=True' in result.stdout.decode()
assert 'stdin=True, stdout=True' in result.stdout.decode()


def test_main_run_child_ttyname():
"""Ensure that CT spawns the child in a pseudo-terminal."""
master, slave = os.openpty()

subprocess.run('./ct ' + get_python_command(CODE_TTYNAME),
check=True,
shell=True,
stdin=slave,
stdout=slave)

assert os.ttyname(slave) not in os.read(master, 1024).decode()

0 comments on commit ad02ea5

Please sign in to comment.