Skip to content

Commit

Permalink
Enhance process wrapper by providing user input support
Browse files Browse the repository at this point in the history
If mutagen process is stuck for more than 5 seconds with no output, it will display it's output to the user so he can enter input, for example when connection to a new ssh host to accept the new key.

It also provide better error messages when mutagen process fails to execute the command.

Close #5
Close #6
  • Loading branch information
Toilal committed May 24, 2019
1 parent 340e6c4 commit cf9a679
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 23 deletions.
15 changes: 12 additions & 3 deletions mutagen_helper/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os
import time

from click import ClickException
from tinydb import TinyDB, Query

from .parser import YamlProjectParser
Expand All @@ -10,7 +12,7 @@
os.environ.get("MUTAGEN_HELPER_HOME", os.path.join(os.path.expanduser("~"), ".mutagen-helper")), "db.json")


class ManagerException(Exception):
class ManagerException(ClickException):
pass


Expand Down Expand Up @@ -42,6 +44,13 @@ def _effective_beta(self, session, project_name):
beta = beta + '/' + project_name
return beta

def _check_session_id(self, db_session_id, timeout=5000):
start_time = int(round(time.time() * 1000))
while not self.wrapper.list(db_session_id):
if int(round(time.time() * 1000)) > start_time + timeout:
return False
return True

def _get_db_session(self, session, project_name, build_if_not_found=False):
if hasattr(session, 'doc_id'):
return session
Expand All @@ -55,7 +64,7 @@ def _get_db_session(self, session, project_name, build_if_not_found=False):
raise ManagerException("Too many matching sessions")
for db_session_candidate in db_sessions:
db_session_id = db_session_candidate.get('session_id')
if not db_session_id or not self.wrapper.list(db_session_id):
if not db_session_id or not self._check_session_id(db_session_id):
# Invalid session in database
logging.warning("Session %s found in mutagen-helper, but doesn't exists in mutagen. "
"Removing from mutagen-helper." % db_session_id)
Expand Down Expand Up @@ -132,7 +141,7 @@ def up_session(self, session, project_name):
'Session %s[%s] (%s) exists, but its configuration for has changed.' % (
project_name, name, session_id))
else:
logging.debug('Session %s[%s] (%s) already exists.' % (project_name, name, session_id))
logging.info('Session %s[%s] (%s) already exists.' % (project_name, name, session_id))
return
else:
logging.debug('No session %s[%s] found.' % (name, project_name))
Expand Down
166 changes: 146 additions & 20 deletions mutagen_helper/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import logging
import os
import re
import shlex
import subprocess
import sys
import time
from queue import Queue, Empty
from threading import Thread

from click import ClickException

mutagen = os.environ.get('MUTAGEN_HELPER_MUTAGEN_BIN', "mutagen.exe" if os.name == 'nt' else "mutagen")


class WrapperException(Exception):
class WrapperException(ClickException):
pass


class MutagenException(WrapperException):
pass


class DaemonNotRunningException(WrapperException):
pass


class MutagenListParser:
def _is_separator_line(self, line):
return line.startswith('-'*10)
return line.startswith('-' * 10)

def parse(self, output: str):
if not output:
Expand Down Expand Up @@ -71,11 +84,129 @@ def parse(self, output: str):
return sessions


class MutagenWrapper:
class ProcessWrapper:
STDIN = 0
STDOUT = 1
STDERR = 2

def run(self, command, print_output=False, print_output_if_idle=5000):
logging.debug('Running command: %s' % shlex.quote(' '.join(command)))

process = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

def enqueue_output(process, out, type, queue):
while True:
data = out.read()
if data:
queue.put((type, data))
elif process.poll() is not None:
break

def listen_stdin(queue):
for line in sys.stdin.readline():
if process.poll() is not None:
break
queue.put((ProcessWrapper.STDIN, line.encode(sys.stdin.encoding)))

queue = Queue()

thread = Thread(target=enqueue_output, args=(process, process.stdout, ProcessWrapper.STDOUT, queue))
thread.daemon = True
thread.start()

stderr_thread = Thread(target=enqueue_output, args=(process, process.stderr, ProcessWrapper.STDERR, queue))
stderr_thread.daemon = True
stderr_thread.start()

stdin_thread = Thread(target=listen_stdin, args=(queue,))
stdin_thread.daemon = True
stdin_thread.start()

# read line without blocking
stdout = b''
stderr = b''
stdin = b''

recorded = []

last_read_time = int(round(time.time() * 1000))
while True:
try:
stream, data = queue.get_nowait()
recorded.append((stream, data))
last_read_time = int(round(time.time() * 1000))
except Empty:
if process.poll() is not None:
remaining_stdout, remaining_sdterr = process.communicate()
if remaining_stdout:
queue.put(ProcessWrapper.STDOUT, remaining_stdout)
if remaining_sdterr:
queue.put(ProcessWrapper.STDERR, remaining_sdterr)
if not remaining_stdout and not remaining_sdterr:
break

if recorded and int(round(time.time() * 1000)) - last_read_time > print_output_if_idle:
logging.warning("The following mutagen command seems to require your input.")
logging.warning(shlex.quote(' '.join(command)))
logging.warning("Please enter your input if required, or kindly wait for it to terminate.")
print_output = True
for stream, data in recorded:
if stream == ProcessWrapper.STDOUT:
sys.stdout.buffer.write(data)
elif stream == ProcessWrapper.STDERR:
sys.stderr.buffer.write(data)
recorded = []
continue

if stream == ProcessWrapper.STDOUT:
stdout = stdout + data
if print_output:
sys.stdout.buffer.write(data)
elif stream == ProcessWrapper.STDERR:
stderr = stderr + data
if print_output:
sys.stderr.buffer.write(data)
elif stream == ProcessWrapper.STDIN:
stdin = stdin + data
process.stdin.write(data)
process.stdin.flush()

return subprocess.CompletedProcess(process.args, process.returncode,
str(stdout, encoding=sys.stdout.encoding),
str(stderr, encoding=sys.stdout.encoding)
)


class MutagenWrapper(ProcessWrapper):
def __init__(self, mutagen="mutagen.exe" if os.name == 'nt' else "mutagen"):
self.mutagen = mutagen
self.list_parser = MutagenListParser()

def run(self, command, print_output=False, print_output_on_idle=5000):
"""
result = subprocess.run([self.mutagen] + command,
encoding=sys.stdout.encoding,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
"""

result = super().run([self.mutagen] + command, print_output, print_output_on_idle)

if result.returncode == 1 and 'unable to connect to daemon' in result.stderr:
raise DaemonNotRunningException("Mutagen daemon doesn't seems to run. "
"Start the daemon with \"mutagen daemon start\" command and try again.")

if result.returncode != 0:
raise MutagenException("Mutagen has failed to execute a command: " +
(shlex.quote(' '.join([self.mutagen] + command))[1:-1]) +
(os.linesep + result.stdout if result.stdout else '') +
(os.linesep + result.stderr if result.stderr else ''))

return result

def create(self, alpha, beta, options=None):
"""
Creates a new session
Expand All @@ -101,49 +232,44 @@ def create(self, alpha, beta, options=None):
elif isinstance(options, str):
options = shlex.split(options)

result = self.run([self.mutagen, 'create', alpha, beta] + (list(options) if options else []))
command = ['create', alpha, beta] + (list(options) if options else [])
result = self.run(command)
ret = result.stdout
match = re.search('Created session\\s(.*?)\\s', ret)
if match:
return match.group(1)
raise WrapperException("Invalid response: " + ret)

def run(self, command):
return subprocess.run(command, check=True,
encoding=sys.stdout.encoding,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

def terminate(self, session_id=None):
if session_id:
self.run([self.mutagen, 'terminate', session_id])
self.run(['terminate', session_id])
else:
self.run([self.mutagen, 'terminate', '--all'])
self.run(['terminate', '--all'])

def flush(self, session_id=None):
if session_id:
self.run([self.mutagen, 'flush', session_id])
self.run(['flush', session_id])
else:
self.run([self.mutagen, 'flush', '--all'])
self.run(['flush', '--all'])

def pause(self, session_id=None):
if session_id:
self.run([self.mutagen, 'pause', session_id])
self.run(['pause', session_id])
else:
self.run([self.mutagen, 'pause', '--all'])
self.run(['pause', '--all'])

def resume(self, session_id=None):
if session_id:
self.run([self.mutagen, 'resume', session_id])
self.run(['resume', session_id])
else:
self.run([self.mutagen, 'resume', '--all'])
self.run(['resume', '--all'])

def list(self, session_id=None):
try:
if session_id:
result = self.run([self.mutagen, 'list', session_id, '-l'])
result = self.run(['list', session_id, '-l'])
else:
result = self.run([self.mutagen, 'list', '-l'])
result = self.run(['list', '-l'])
except subprocess.CalledProcessError as e:
if e.returncode == 1 and 'unable to locate requested sessions' in e.stderr:
return []
Expand Down

0 comments on commit cf9a679

Please sign in to comment.