From 502a13cad8ca7cacd5573ff9e4b8a75a36ca766f Mon Sep 17 00:00:00 2001 From: Anthony Hayward Date: Mon, 21 Mar 2022 15:33:26 +0000 Subject: [PATCH 1/2] Add a configurable timeout on the zerodeploy close() method --- docs/docs/zerodeploy.rst | 6 +++++- rpyc/utils/zerodeploy.py | 18 ++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/docs/docs/zerodeploy.rst b/docs/docs/zerodeploy.rst index 3a1b950d..20b52c98 100644 --- a/docs/docs/zerodeploy.rst +++ b/docs/docs/zerodeploy.rst @@ -114,4 +114,8 @@ under the user's permissions. You can connect as an unprivileged user to make su ``rm -rf /``. Second, it creates an SSH tunnel for the transport, so everything is kept encrypted on the wire. And you get these features for free -- just configuring SSH accounts will do. - +Timeouts +-------- +You can pass a ``timeout`` argument, in seconds, to the ``close()`` method. A ``TimeoutExpired`` is raised if +any subprocess communication takes longer than the timeout, after the subprocess has been told to terminate. By +default, the timeout is four minutes. The timeout prevents a ``close()`` call blocking indefinitely. diff --git a/rpyc/utils/zerodeploy.py b/rpyc/utils/zerodeploy.py index e63d2bbe..26b3dcb7 100644 --- a/rpyc/utils/zerodeploy.py +++ b/rpyc/utils/zerodeploy.py @@ -4,6 +4,7 @@ Requires [plumbum](http://plumbum.readthedocs.org/) """ from __future__ import with_statement +from subprocess import TimeoutExpired import sys import socket # noqa: F401 from rpyc.lib.compat import BYTES_LITERAL @@ -151,27 +152,36 @@ def __enter__(self): def __exit__(self, t, v, tb): self.close() - def close(self): + def close(self, timeout=4*60): if self.proc is not None: try: self.proc.terminate() - self.proc.communicate() + self.proc.communicate(timeout=timeout) + except TimeoutExpired: + self.proc.kill() + raise except Exception: pass self.proc = None if self.tun is not None: try: self.tun._session.proc.terminate() - self.tun._session.proc.communicate() + self.tun._session.proc.communicate(timeout=timeout) self.tun.close() + except TimeoutExpired: + self.tun._session.proc.kill() + raise except Exception: pass self.tun = None if self.remote_machine is not None: try: self.remote_machine._session.proc.terminate() - self.remote_machine._session.proc.communicate() + self.remote_machine._session.proc.communicate(timeout=timeout) self.remote_machine.close() + except TimeoutExpired: + self.remote_machine._session.proc.kill() + raise except Exception: pass self.remote_machine = None From 772605963980ea1439e6080cece3498820687172 Mon Sep 17 00:00:00 2001 From: Anthony Hayward Date: Tue, 26 Apr 2022 15:59:23 +0100 Subject: [PATCH 2/2] Keep default as no timeout, add unit tests --- docs/docs/zerodeploy.rst | 3 ++- rpyc/utils/zerodeploy.py | 2 +- tests/test_deploy.py | 49 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/docs/docs/zerodeploy.rst b/docs/docs/zerodeploy.rst index 20b52c98..5471d72c 100644 --- a/docs/docs/zerodeploy.rst +++ b/docs/docs/zerodeploy.rst @@ -118,4 +118,5 @@ Timeouts -------- You can pass a ``timeout`` argument, in seconds, to the ``close()`` method. A ``TimeoutExpired`` is raised if any subprocess communication takes longer than the timeout, after the subprocess has been told to terminate. By -default, the timeout is four minutes. The timeout prevents a ``close()`` call blocking indefinitely. +default, the timeout is ``None`` i.e. infinite. A timeout value prevents a ``close()`` call blocking +indefinitely. diff --git a/rpyc/utils/zerodeploy.py b/rpyc/utils/zerodeploy.py index 26b3dcb7..b2847764 100644 --- a/rpyc/utils/zerodeploy.py +++ b/rpyc/utils/zerodeploy.py @@ -152,7 +152,7 @@ def __enter__(self): def __exit__(self, t, v, tb): self.close() - def close(self, timeout=4*60): + def close(self, timeout=None): if self.proc is not None: try: self.proc.terminate() diff --git a/tests/test_deploy.py b/tests/test_deploy.py index bb699e6e..063e8051 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -1,6 +1,9 @@ from __future__ import with_statement + import unittest +import subprocess import sys + from plumbum import SshMachine from plumbum.machines.paramiko_machine import ParamikoMachine from rpyc.utils.zerodeploy import DeployedServer @@ -11,7 +14,6 @@ _paramiko_import_failed = True -@unittest.skipIf(_paramiko_import_failed, "Paramiko is not available") class TestDeploy(unittest.TestCase): def test_deploy(self): rem = SshMachine("localhost") @@ -30,6 +32,51 @@ def test_deploy(self): self.fail("expected an EOFError") rem.close() + def test_close_timeout(self): + expected_timeout = 4 + observed_timeouts = [] + original_communicate = subprocess.Popen.communicate + + def replacement_communicate(self, input=None, timeout=None): + observed_timeouts.append(timeout) + return original_communicate(self, input, timeout) + + try: + subprocess.Popen.communicate = replacement_communicate + rem = SshMachine("localhost") + SshMachine.python = rem[sys.executable] + dep = DeployedServer(rem) + dep.classic_connect() + dep.close(timeout=expected_timeout) + rem.close() + finally: + subprocess.Popen.communicate = original_communicate + # The last three calls to communicate() happen during close(), so check they + # applied the timeout. + assert observed_timeouts[-3:] == [expected_timeout] * 3 + + def test_close_timeout_default_none(self): + observed_timeouts = [] + original_communicate = subprocess.Popen.communicate + + def replacement_communicate(self, input=None, timeout=None): + observed_timeouts.append(timeout) + return original_communicate(self, input, timeout) + + try: + subprocess.Popen.communicate = replacement_communicate + rem = SshMachine("localhost") + SshMachine.python = rem[sys.executable] + dep = DeployedServer(rem) + dep.classic_connect() + dep.close() + rem.close() + finally: + subprocess.Popen.communicate = original_communicate + # No timeout specified, so Popen.communicate should have been called with timeout None. + assert observed_timeouts == [None] * len(observed_timeouts) + + @unittest.skipIf(_paramiko_import_failed, "Paramiko is not available") def test_deploy_paramiko(self): rem = ParamikoMachine("localhost", missing_host_policy=paramiko.AutoAddPolicy()) with DeployedServer(rem) as dep: