Skip to content

Commit

Permalink
Extract pty class and improve cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
angusjfw committed Nov 3, 2022
1 parent 173c158 commit 15a00fd
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 41 deletions.
52 changes: 14 additions & 38 deletions further_link/runner/process_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import os
import signal
from functools import partial
from pty import openpty
from shlex import split

import aiofiles
from pt_web_vnc.vnc import async_start, async_stop

from ..util.async_helpers import ringbuf_read, timeout
from ..util.id_generator import IdGenerator
from ..util.ipc import async_ipc_send, async_start_ipc_server, ipc_cleanup
from ..util.pty import Pty
from ..util.sdk import get_first_display
from ..util.terminal import set_winsize
from ..util.user_config import (
get_current_user,
get_gid,
Expand Down Expand Up @@ -55,7 +53,7 @@ class InvalidOperation(Exception):

class ProcessHandler:
def __init__(self, user, pty=False):
self.pty = pty
self.use_pty = pty
self.id = id_generator.create()
assert user_exists(user)
self.user = user
Expand All @@ -74,22 +72,15 @@ async def _start(self, command, work_dir=None, env={}, novncOptions={}):

stdio = asyncio.subprocess.PIPE

if self.pty:
if self.use_pty:
logging.debug(f"{self.id} Starting PTY")
# communicate through a pty for terminal 'cooked mode' behaviour
master, slave = openpty()

# on some distros process user must own slave, otherwise you get:
# cannot set terminal process group (-1): Inappropriate ioctl for device
os.chown(slave, get_uid(self.user), get_gid(self.user))

self.pty_master = await aiofiles.open(master, "w+b", 0)
self.pty_slave = await aiofiles.open(slave, "r+b", 0)
self.pty = await Pty.create(self.user)

# set terminal size to a minimum that we display in Further
set_winsize(slave, 4, 60)
self.pty.set_winsize(4, 60)

stdio = self.pty_slave
stdio = self.pty.back

logging.debug(f"{self.id} Setup pty")

Expand Down Expand Up @@ -190,19 +181,15 @@ async def stop(self):
logging.debug(f"{self.id} Could not stop - ProcessLookupError")

async def send_input(self, content):
logging.debug(f"{self.id} Receiving input {content}")

if not self.is_running() or not isinstance(content, str):
logging.debug(f"{self.id} Not receiving input as not running")
raise InvalidOperation()

content_bytes = content.encode("utf-8")

if self.pty:
logging.debug(f"{self.id} Receiving input via pty")
write_task = asyncio.create_task(self.pty_master.write(content_bytes))
if self.use_pty:
write_task = asyncio.create_task(self.pty.front.write(content_bytes))
else:
logging.debug(f"{self.id} Receiving input via stdin")

async def write():
self.process.stdin.write(content_bytes)
Expand All @@ -213,14 +200,12 @@ async def write():
done = await timeout(write_task, 0.1)
if write_task not in done:
logging.debug(f"{self.id} Receiving input timed out")
else:
logging.debug(f"{self.id} Received input")

async def resize_pty(self, rows, cols):
if not self.is_running() or not self.pty:
if not self.is_running() or not self.use_pty:
raise InvalidOperation()

set_winsize(self.pty_slave.fileno(), rows, cols)
self.pty.set_winsize(rows, cols)

async def send_key_event(self, key, event):
if (
Expand All @@ -246,9 +231,9 @@ async def _ipc_communicate(self):

async def _process_communicate(self):
output_tasks = []
if self.pty:
if self.use_pty:
output_tasks.append(
asyncio.create_task(self._handle_output(self.pty_master, "stdout"))
asyncio.create_task(self._handle_output(self.pty.front, "stdout"))
)
else:
output_tasks.append(
Expand Down Expand Up @@ -297,17 +282,8 @@ async def _clean_up(self):
logging.debug(f"{self.id} Starting cleanup")
if getattr(self, "pty", None):
logging.debug(f"{self.id} Cleaning up PTY")
try:
if getattr(self, "pty_master", None):
logging.debug(f"{self.id} Closing PTY master")
await self.pty_master.close()
logging.debug(f"{self.id} Closed PTY master")
if getattr(self, "pty_slave", None):
logging.debug(f"{self.id} Closing PTY slave")
await self.pty_slave.close()
logging.debug(f"{self.id} Closed PTY slave")
except Exception as e:
logging.exception(f"{self.id} PTY Cleanup error: {e}")
await self.pty.clean_up()
self.pty = None

if getattr(self, "novnc", None):
logging.debug(f"{self.id} Cleaning up NOVNC")
Expand Down
4 changes: 4 additions & 0 deletions further_link/util/async_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# manipulation or os.remove which blocks no more than starting a thread would


async def close(fd):
return await asyncio.to_thread(partial(os.close, fd))


async def exists(path):
return await asyncio.to_thread(partial(os.path.exists, path))

Expand Down
60 changes: 60 additions & 0 deletions further_link/util/pty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
import logging
from pty import openpty

import aiofiles

from ..util.async_files import chown, close
from ..util.async_helpers import timeout
from ..util.terminal import set_winsize
from ..util.user_config import get_gid, get_uid


class Pty:
@classmethod
async def create(cls, user):
self = cls()
self.front_fd, self.back_fd = openpty()

# on some distros process user must own 'back', otherwise you get:
# cannot set terminal process group (-1): Inappropriate ioctl for device
await chown(self.back_fd, get_uid(user), get_gid(user))

self.front = await aiofiles.open(self.front_fd, "w+b", 0)
self.back = await aiofiles.open(self.back_fd, "r+b", 0)

return self

def set_winsize(self, rows, cols):
set_winsize(self.back_fd, 4, 60)

async def write(self, content_bytes):
await self.front.write(content_bytes)

async def clean_up(self):
logging.exception("PTY Closing master")
await self._clean_up_end(self.front, self.front_fd)
logging.exception("PTY Closing slave")
await self._clean_up_end(self.back, self.back_fd)
logging.exception("PTY Cleanup complete")

async def _clean_up_end(self, file, fd):
# aiofiles close sometimes hangs, so use a timeout and try closing the
# fd directly
async def close_end():
try:
logging.debug("PTY Closing file")
await file.close()
except Exception as e:
logging.exception(f"PTY Close error: {e}")

close_task = asyncio.create_task(close_end())
done = await timeout(close_task, 0.1)
if close_task not in done:
logging.debug("PTY Close timed out")

try:
logging.debug("PTY Closing fd")
await close(fd)
except Exception as e:
logging.exception(f"PTY fd close error: {e}")
3 changes: 0 additions & 3 deletions tests/unit/test_process_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from unittest.mock import patch

import pytest
from aiofiles.threadpool.binary import AsyncFileIO
from mock import AsyncMock

from further_link.runner.process_handler import ProcessHandler
Expand Down Expand Up @@ -75,8 +74,6 @@ async def test_pty():
await p.start('python3 -u -c "print(input())"')
assert type(p.process) == Process
assert p.pty
assert type(p.pty_master) == AsyncFileIO
assert type(p.pty_slave) == AsyncFileIO

p.on_start.assert_called()

Expand Down

0 comments on commit 15a00fd

Please sign in to comment.