From 99f543c04ae8f409668759129776fbe5303f9b0a Mon Sep 17 00:00:00 2001 From: Michael Milton Date: Wed, 29 Nov 2017 17:45:34 +1100 Subject: [PATCH 1/2] Initial implementation --- plumbum/commands/base.py | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/plumbum/commands/base.py b/plumbum/commands/base.py index 9e40cb3ae..533e7cd05 100644 --- a/plumbum/commands/base.py +++ b/plumbum/commands/base.py @@ -2,6 +2,7 @@ import functools from contextlib import contextmanager from plumbum.commands.processes import run_proc, iter_lines +import plumbum.commands.modifiers from plumbum.lib import six from tempfile import TemporaryFile from subprocess import PIPE, Popen @@ -224,6 +225,57 @@ def run(self, args = (), **kwargs): with self.bgrun(args, **kwargs) as p: return p.run() + def _use_modifier(self, modifier, args): + """ + Applies a modifier to the current object (e.g. FG, NOHUP) + :param modifier: The modifier class to apply (e.g. FG) + :param args: A dictionary of arguments to pass to this modifier + :return: + """ + modifier_instance = modifier(**args) + self & modifier_instance + + def bg(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the BG construct + :py:class: `plumbum.commands.modifiers.BG` + """ + self._use_modifier(plumbum.commands.modifiers.BG, kwargs) + + def fg(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the FG construct + :py:class: `plumbum.commands.modifiers.FG` + """ + self._use_modifier(plumbum.commands.modifiers.FG, kwargs) + + def tee(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the TEE construct + :py:class: `plumbum.commands.modifiers.TEE` + """ + self._use_modifier(plumbum.commands.modifiers.TEE, kwargs) + + def tf(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the TF construct + :py:class: `plumbum.commands.modifiers.TF` + """ + self._use_modifier(plumbum.commands.modifiers.TF, kwargs) + + def retcode(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the RETCODE construct + :py:class: `plumbum.commands.modifiers.RETCODE` + """ + self._use_modifier(plumbum.commands.modifiers.RETCODE, kwargs) + + def nohup(self, **kwargs): + """ + Run this command in the background. Uses all arguments from the NOHUP construct + :py:class: `plumbum.commands.modifiers.NOHUP` + """ + self._use_modifier(plumbum.commands.modifiers.NOHUP, kwargs) class BoundCommand(BaseCommand): __slots__ = ("cmd", "args") From eceed773ffb75b9bd11a107a872f875772f00021 Mon Sep 17 00:00:00 2001 From: Michael Milton Date: Mon, 9 Apr 2018 16:12:40 +1000 Subject: [PATCH 2/2] Added tests; renamed all modifier methods to add "run_" prefix; fixed circular import --- plumbum/commands/base.py | 36 +-- plumbum/commands/modifiers.py | 6 +- tests/test_local.py | 412 ++++++++++++++++++++++++++++++++++ 3 files changed, 433 insertions(+), 21 deletions(-) diff --git a/plumbum/commands/base.py b/plumbum/commands/base.py index 533e7cd05..57f8ae762 100644 --- a/plumbum/commands/base.py +++ b/plumbum/commands/base.py @@ -233,49 +233,49 @@ def _use_modifier(self, modifier, args): :return: """ modifier_instance = modifier(**args) - self & modifier_instance + return self & modifier_instance - def bg(self, **kwargs): + def run_bg(self, **kwargs): """ Run this command in the background. Uses all arguments from the BG construct :py:class: `plumbum.commands.modifiers.BG` """ - self._use_modifier(plumbum.commands.modifiers.BG, kwargs) + return self._use_modifier(plumbum.commands.modifiers.BG, kwargs) - def fg(self, **kwargs): + def run_fg(self, **kwargs): """ - Run this command in the background. Uses all arguments from the FG construct + Run this command in the foreground. Uses all arguments from the FG construct :py:class: `plumbum.commands.modifiers.FG` """ - self._use_modifier(plumbum.commands.modifiers.FG, kwargs) + return self._use_modifier(plumbum.commands.modifiers.FG, kwargs) - def tee(self, **kwargs): + def run_tee(self, **kwargs): """ - Run this command in the background. Uses all arguments from the TEE construct + Run this command using the TEE construct. Inherits all arguments from TEE :py:class: `plumbum.commands.modifiers.TEE` """ - self._use_modifier(plumbum.commands.modifiers.TEE, kwargs) + return self._use_modifier(plumbum.commands.modifiers.TEE, kwargs) - def tf(self, **kwargs): + def run_tf(self, **kwargs): """ - Run this command in the background. Uses all arguments from the TF construct + Run this command using the TF construct. Inherits all arguments from TF :py:class: `plumbum.commands.modifiers.TF` """ - self._use_modifier(plumbum.commands.modifiers.TF, kwargs) + return self._use_modifier(plumbum.commands.modifiers.TF, kwargs) - def retcode(self, **kwargs): + def run_retcode(self, **kwargs): """ - Run this command in the background. Uses all arguments from the RETCODE construct + Run this command using the RETCODE construct. Inherits all arguments from RETCODE :py:class: `plumbum.commands.modifiers.RETCODE` """ - self._use_modifier(plumbum.commands.modifiers.RETCODE, kwargs) + return self._use_modifier(plumbum.commands.modifiers.RETCODE, kwargs) - def nohup(self, **kwargs): + def run_nohup(self, **kwargs): """ - Run this command in the background. Uses all arguments from the NOHUP construct + Run this command using the NOHUP construct. Inherits all arguments from NOHUP :py:class: `plumbum.commands.modifiers.NOHUP` """ - self._use_modifier(plumbum.commands.modifiers.NOHUP, kwargs) + return self._use_modifier(plumbum.commands.modifiers.NOHUP, kwargs) class BoundCommand(BaseCommand): __slots__ = ("cmd", "args") diff --git a/plumbum/commands/modifiers.py b/plumbum/commands/modifiers.py index c370795d6..8ee7f364c 100644 --- a/plumbum/commands/modifiers.py +++ b/plumbum/commands/modifiers.py @@ -6,7 +6,7 @@ from itertools import chain from plumbum.commands.processes import run_proc, ProcessExecutionError -from plumbum.commands.base import AppendingStdoutRedirection, StdoutRedirection +import plumbum.commands.base from plumbum.lib import read_fd_decode_safely @@ -310,11 +310,11 @@ def __init__(self, cwd='.', stdout='nohup.out', stderr=None, append=True): self.append = append def __rand__(self, cmd): - if isinstance(cmd, StdoutRedirection): + if isinstance(cmd, plumbum.commands.base.StdoutRedirection): stdout = cmd.file append = False cmd = cmd.cmd - elif isinstance(cmd, AppendingStdoutRedirection): + elif isinstance(cmd, plumbum.commands.base.AppendingStdoutRedirection): stdout = cmd.file append = True cmd = cmd.cmd diff --git a/tests/test_local.py b/tests/test_local.py index 9aa9e712e..d7d2a8f0b 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -753,6 +753,418 @@ def test_pipeline_stdin(self): future.stdin.write(b'foobar') future.stdin.close() + @skip_on_windows + def test_redirection(self): + from plumbum.cmd import cat, ls, grep, rm + + chain = (ls | grep["\\.py"]) > "tmp.txt" + chain() + + chain2 = (cat < "tmp.txt") | grep["local"] + assert "test_local.py" in chain2().splitlines() + rm("tmp.txt") + + chain3 = (cat << "this is the\nworld of helloness and\nspam bar and eggs") | grep["hello"] + assert "world of helloness and" in chain3().splitlines() + + rc, _, err = (grep["-Zq5"] >= "tmp2.txt").run(["-Zq5"], retcode = None) + assert rc == 2 + assert not err + assert "usage" in (cat < "tmp2.txt")().lower() + rm("tmp2.txt") + + rc, out, _ = (grep["-Zq5"] >= ERROUT).run(["-Zq5"], retcode = None) + assert rc == 2 + assert "usage" in out.lower() + + @skip_on_windows + def test_popen(self): + from plumbum.cmd import ls + + p = ls.popen(["-a"]) + out, _ = p.communicate() + assert p.returncode == 0 + assert "test_local.py" in out.decode(local.encoding).splitlines() + + def test_run(self): + from plumbum.cmd import ls, grep + + rc, out, err = (ls | grep["non_exist1N9"]).run(retcode = 1) + assert rc == 1 + + def test_timeout(self): + from plumbum.cmd import sleep + with pytest.raises(ProcessTimedOut): + sleep(10, timeout = 5) + + @skip_on_windows + def test_pipe_stderr(self, capfd): + from plumbum.cmd import cat, head + cat['/dev/urndom'] & FG(1) + assert 'urndom' in capfd.readouterr()[1] + + assert '' == capfd.readouterr()[1] + + (cat['/dev/urndom'] | head['-c', '10']) & FG(retcode=1) + assert 'urndom' in capfd.readouterr()[1] + + @skip_on_windows + def test_fair_error_attribution(self): + # use LocalCommand directly for predictable argv + false = LocalCommand('false') + true = LocalCommand('true') + with pytest.raises(ProcessExecutionError) as e: + (false | true) & FG + assert e.value.argv == ['false'] + + + @skip_on_windows + def test_iter_lines_timeout(self): + from plumbum.cmd import ping + + with pytest.raises(ProcessTimedOut): + # Order is important on mac + for i, (out, err) in enumerate(ping["-i", 0.5, "127.0.0.1"].popen().iter_lines(timeout=2)): + print("out:", out) + print("err:", err) + assert i > 3 + + + @skip_on_windows + def test_iter_lines_error(self): + from plumbum.cmd import ls + with pytest.raises(ProcessExecutionError) as err: + for i, lines in enumerate(ls["--bla"].popen()): + pass + assert i == 1 + assert (err.value.stderr.startswith("/bin/ls: unrecognized option '--bla'") + or err.value.stderr.startswith("/bin/ls: illegal option -- -")) + + @skip_on_windows + def test_modifiers(self): + from plumbum.cmd import ls, grep + f = (ls["-a"] | grep["\\.py"]) & BG + f.wait() + assert "test_local.py" in f.stdout.splitlines() + + command = (ls["-a"] | grep["local"]) + command_false = (ls["-a"] | grep["not_a_file_here"]) + command & FG + assert command & TF + assert not (command_false & TF) + assert command & RETCODE == 0 + assert command_false & RETCODE == 1 + + @skip_on_windows + @pytest.mark.xfail(reason= + 'This test randomly fails on Mac and PyPy on Travis, not sure why') + def test_tee_modifier(self, capfd): + from plumbum.cmd import echo + + result = echo['This is fun'] & TEE + assert result[1] == 'This is fun\n' + assert 'This is fun\n' == capfd.readouterr()[0] + + def test_arg_expansion(self): + from plumbum.cmd import ls + args = [ '-l', '-F' ] + ls(*args) + ls[args] + + @skip_on_windows + def test_session(self): + sh = local.session() + for _ in range(4): + _, out, _ = sh.run("ls -a") + assert "test_local.py" in out.splitlines() + + sh.run("cd ..") + sh.run("export FOO=17") + out = sh.run("echo $FOO")[1] + assert out.splitlines() == ["17"] + + def test_quoting(self): + ssh = local["ssh"] + pwd = local["pwd"] + + cmd = ssh["localhost", "cd", "/usr", "&&", ssh["localhost", "cd", "/", "&&", + ssh["localhost", "cd", "/bin", "&&", pwd]]] + assert "\"'&&'\"" in " ".join(cmd.formulate(0)) + + ls = local['ls'] + with pytest.raises(ProcessExecutionError) as execinfo: + ls('-a', '') # check that empty strings are rendered correctly + assert execinfo.value.argv[-2:] == ['-a', ''] + + def test_tempdir(self): + from plumbum.cmd import cat + with local.tempdir() as dir: + assert dir.is_dir() + data = six.b("hello world") + with open(str(dir / "test.txt"), "wb") as f: + f.write(data) + with open(str(dir / "test.txt"), "rb") as f: + assert f.read() == data + + assert not dir.exists() + + def test_direct_open_tmpdir(self): + from plumbum.cmd import cat + with local.tempdir() as dir: + assert dir.is_dir() + data = six.b("hello world") + with open(dir / "test.txt", "wb") as f: + f.write(data) + with open(dir / "test.txt", "rb") as f: + assert f.read() == data + + assert not dir.exists() + + + def test_read_write_str(self): + with local.tempdir() as tmp: + data = "hello world" + (tmp / "foo.txt").write(data) + assert (tmp / "foo.txt").read() == data + + def test_read_write_unicode(self): + with local.tempdir() as tmp: + data = six.u("hello world") + (tmp / "foo.txt").write(data) + assert (tmp / "foo.txt").read() == data + + def test_read_write_bin(self): + with local.tempdir() as tmp: + data = six.b("hello world") + (tmp / "foo.txt").write(data) + assert (tmp / "foo.txt").read(mode='rb') == data + + def test_links(self): + with local.tempdir() as tmp: + src = tmp / "foo.txt" + dst1 = tmp / "bar.txt" + dst2 = tmp / "spam.txt" + data = "hello world" + src.write(data) + src.link(dst1) + assert data == dst1.read() + src.symlink(dst2) + assert data == dst2.read() + + def test_list_processes(self): + assert list(local.list_processes()) + + def test_pgrep(self): + assert list(local.pgrep("python")) + + def _generate_sigint(self): + with pytest.raises(KeyboardInterrupt): + if sys.platform == "win32": + from win32api import GenerateConsoleCtrlEvent + GenerateConsoleCtrlEvent(0, 0) # send Ctrl+C to current TTY + else: + os.kill(0, signal.SIGINT) + time.sleep(1) + + @skip_without_tty + @skip_on_windows + def test_same_sesion(self): + from plumbum.cmd import sleep + p = sleep.popen([1000]) + assert p.poll() is None + self._generate_sigint() + time.sleep(1) + assert p.poll() is not None + + @skip_without_tty + def test_new_session(self): + from plumbum.cmd import sleep + p = sleep.popen([1000], new_session = True) + assert p.poll() is None + self._generate_sigint() + time.sleep(1) + assert p.poll() is None + p.terminate() + + def test_local_daemon(self): + from plumbum.cmd import sleep + proc = local.daemonic_popen(sleep[5]) + with pytest.raises(OSError): + os.waitpid(proc.pid, 0) + proc.wait() + + @skip_on_windows + def test_atomic_file(self): + af1 = AtomicFile("tmp.txt") + af2 = AtomicFile("tmp.txt") + af1.write_atomic(six.b("foo")) + af2.write_atomic(six.b("bar")) + assert af1.read_atomic() == six.b("bar") + assert af2.read_atomic() == six.b("bar") + local.path("tmp.txt").delete() + + @skip_on_windows + def test_atomic_file2(self): + af = AtomicFile("tmp.txt") + + code = """from __future__ import with_statement +from plumbum.fs.atomic import AtomicFile +af = AtomicFile("tmp.txt") +try: + with af.locked(blocking = False): + raise ValueError("this should have failed") +except (OSError, IOError): + print("already locked") +""" + with af.locked(): + output = local.python("-c", code) + assert output.strip() == "already locked" + + local.path("tmp.txt").delete() + + @skip_on_windows + def test_pid_file(self): + code = """from __future__ import with_statement +from plumbum.fs.atomic import PidFile, PidFileTaken +try: + with PidFile("mypid"): + raise ValueError("this should have failed") +except PidFileTaken: + print("already locked") +""" + with PidFile("mypid"): + output = local.python("-c", code) + assert output.strip() == "already locked" + + local.path("mypid").delete() + + @skip_on_windows + def test_atomic_counter(self): + local.path("counter").delete() + num_of_procs = 20 + num_of_increments = 20 + + code = """from plumbum.fs.atomic import AtomicCounterFile +import time +time.sleep(0.2) +afc = AtomicCounterFile.open("counter") +for _ in range(%s): + print(afc.next()) + time.sleep(0.1) +""" % (num_of_increments,) + + procs = [] + for _ in range(num_of_procs): + procs.append(local.python["-c", code].popen()) + results = [] + for p in procs: + out, _ = p.communicate() + assert p.returncode == 0 + results.extend(int(num) for num in out.splitlines()) + + assert len(results) == num_of_procs * num_of_increments + assert len(set(results)) == len(results) + assert min(results) == 0 + assert max(results) == num_of_procs * num_of_increments - 1 + local.path("counter").delete() + + @skip_on_windows + def test_atomic_counter2(self): + local.path("counter").delete() + afc = AtomicCounterFile.open("counter") + assert afc.next() == 0 + assert afc.next() == 1 + assert afc.next() == 2 + + with pytest.raises(TypeError): + afc.reset("hello") + + afc.reset(70) + assert afc.next() == 70 + assert afc.next() == 71 + assert afc.next() == 72 + + local.path("counter").delete() + + @skip_on_windows + @pytest.mark.skipif("printenv" not in local, + reason = "printenv is missing") + def test_bound_env(self): + from plumbum.cmd import printenv + + with local.env(FOO = "hello"): + assert printenv.with_env(BAR = "world")("FOO") == "hello\n" + assert printenv.with_env(BAR = "world")("BAR") == "world\n" + assert printenv.with_env(FOO = "sea", BAR = "world")("FOO") == "sea\n" + assert printenv("FOO") == "hello\n" + + def test_nesting_lists_as_argv(self): + from plumbum.cmd import ls + c = ls["-l", ["-a", "*.py"]] + assert c.formulate()[1:] == ['-l', '-a', '*.py'] + + def test_contains(self): + assert "ls" in local + + def test_issue_139(self): + LocalPath(local.cwd) + + def test_pipeline_failure(self): + from plumbum.cmd import ls, head + with pytest.raises(ProcessExecutionError): + (ls["--no-such-option"] | head)() + + def test_pipeline_retcode(self): + "From PR #288" + from plumbum.cmd import echo, grep + print( (echo['one two three four'] | grep['two'] | grep['three'])(retcode=None)) + print( (echo['one two three four'] | grep['five'] | grep['three'])(retcode=None)) + print( (echo['one two three four'] | grep['two'] | grep['five'])(retcode=None)) + print( (echo['one two three four'] | grep['six'] | grep['five'])(retcode=None)) + + def test_pipeline_stdin(self): + from plumbum.cmd import cat + from subprocess import PIPE + with (cat | cat).bgrun(stdin=PIPE) as future: + future.stdin.write(b'foobar') + future.stdin.close() + + def test_run_bg(self): + from plumbum.cmd import ls + f = ls["-a"].run_bg() + f.wait() + assert 'test_local.py' in f.stdout + + def test_run_fg(self, capfd): + from plumbum.cmd import ls + ls["-l"].run_fg() + stdout = capfd.readouterr()[0] + assert 'test_local.py' in stdout + + def test_run_tee(self, capfd): + from plumbum.cmd import ls + f = ls["-l"].run_tee() + stdout = capfd.readouterr()[0] + assert 'test_local.py' in stdout + assert 'test_local.py' in f[1] + + def test_run_tf(self): + from plumbum.cmd import ls + f = ls["-l"].run_tf() + assert f == True + + def test_run_retcode(self): + from plumbum.cmd import ls + f = ls["-l"].run_retcode() + assert f == 0 + + def test_run_nohup(self): + from plumbum.cmd import ls + f = ls["-l"].run_nohup() + f.wait() + assert os.path.exists('nohup.out') + os.unlink('nohup.out') + class TestLocalEncoding: try: richstr = unichr(40960)