Skip to content

Commit

Permalink
add option to acquire a process monitor for a command running on a host
Browse files Browse the repository at this point in the history
  • Loading branch information
shadeofblue committed Jan 13, 2022
1 parent 0209dca commit 80b736a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
15 changes: 13 additions & 2 deletions goth/runner/probe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Optional,
Tuple,
TYPE_CHECKING,
Union,
)

from docker import DockerClient
Expand Down Expand Up @@ -351,7 +352,11 @@ async def run_command_on_host(
command: str,
env: Optional[Mapping[str, str]] = None,
command_timeout: float = 300,
) -> AsyncIterator[Tuple[asyncio.Task, PatternMatchingEventMonitor]]:
get_process_monitor: bool = False,
) -> AsyncIterator[Union[
Tuple[asyncio.Task, PatternMatchingEventMonitor],
Tuple[asyncio.Task, PatternMatchingEventMonitor, process.ProcessMonitor]
]]:
"""Run `command` on host in given `env` and with optional `timeout`.
The command is run in the environment extending `env` with variables needed
Expand All @@ -375,6 +380,8 @@ async def run_command_on_host(
)
cmd_monitor.start()

process_monitor = process.ProcessMonitor()

try:
with monitored_logger(
f"goth.{self.name}.command_output", cmd_monitor
Expand All @@ -387,9 +394,13 @@ async def run_command_on_host(
log_level=logging.INFO,
cmd_logger=cmd_logger,
timeout=command_timeout,
process_monitor=process_monitor,
)
)
yield cmd_task, cmd_monitor
yield_content = [cmd_task, cmd_monitor]
if get_process_monitor:
yield_content.append(process_monitor)
yield yield_content

await cmd_task
logger.debug("Command task has finished")
Expand Down
19 changes: 18 additions & 1 deletion goth/runner/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,26 @@
RUN_COMMAND_DEFAULT_TIMEOUT = 900 # seconds


class ProcessMonitor:
_process: Optional[asyncio.subprocess.Process] = None

async def get_process(self) -> asyncio.subprocess.Process:
while not self._process:
await asyncio.sleep(0.1)
return self._process

def set_process(self, process: asyncio.subprocess.Process):
self._process = process


async def run_command(
args: Sequence[str],
env: Optional[dict] = None,
log_level: Optional[int] = logging.DEBUG,
cmd_logger: Optional[logging.Logger] = None,
log_prefix: Optional[str] = None,
timeout: float = RUN_COMMAND_DEFAULT_TIMEOUT,
process_monitor: Optional[ProcessMonitor] = None,
) -> None:
"""Run a command in a subprocess with timeout and logging.
Expand All @@ -34,6 +47,8 @@ async def run_command(
:param log_prefix: prefix for log lines with command output; ignored if `cmd_logger`
is specified. Default: name of the command
:param timeout: timeout for the command, in seconds. Default: 15 minutes
:param process_monitor: and optional `ProcessMonitor` to which the spawned process will be
reported, so that it can be communicated with from the calling code
"""
logger.info("Running local command: %s", " ".join(args))

Expand All @@ -45,11 +60,13 @@ async def run_command(
log_prefix = f"[{args[0]}] "

async def _run_command():

proc = await asyncio.subprocess.create_subprocess_exec(
*args, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)

if process_monitor:
process_monitor.set_process(proc)

while not proc.stdout.at_eof():
line = await proc.stdout.readline()
cmd_logger.log(log_level, "%s%s", log_prefix, line.decode("utf-8").rstrip())
Expand Down

0 comments on commit 80b736a

Please sign in to comment.