diff --git a/tests/integration/modules/test_cmdmod.py b/tests/integration/modules/test_cmdmod.py deleted file mode 100644 index 800111174f08..000000000000 --- a/tests/integration/modules/test_cmdmod.py +++ /dev/null @@ -1,634 +0,0 @@ -import os -import random -import sys -import tempfile -from contextlib import contextmanager - -import pytest - -import salt.utils.path -import salt.utils.platform -import salt.utils.user -from tests.support.case import ModuleCase -from tests.support.helpers import SKIP_INITIAL_PHOTONOS_FAILURES, dedent -from tests.support.runtests import RUNTIME_VARS - -AVAILABLE_PYTHON_EXECUTABLE = salt.utils.path.which_bin( - ["python", "python2", "python2.6", "python2.7"] -) - - -@pytest.mark.windows_whitelisted -class CMDModuleTest(ModuleCase): - """ - Validate the cmd module - """ - - def setUp(self): - self.runas_usr = "nobody" - if salt.utils.platform.is_darwin(): - self.runas_usr = "macsalttest" - - @contextmanager - def _ensure_user_exists(self, name): - if name in self.run_function("user.info", [name]).values(): - # User already exists; don't touch - yield - else: - # Need to create user for test - self.run_function("user.add", [name]) - try: - yield - finally: - self.run_function("user.delete", [name], remove=True) - - @pytest.mark.slow_test - @pytest.mark.skip_on_windows - def test_run(self): - """ - cmd.run - """ - shell = os.environ.get("SHELL") - if shell is None: - # Failed to get the SHELL var, don't run - self.skipTest("Unable to get the SHELL environment variable") - - self.assertTrue(self.run_function("cmd.run", ["echo $SHELL"])) - self.assertEqual( - self.run_function( - "cmd.run", ["echo $SHELL", "shell={}".format(shell)], python_shell=True - ).rstrip(), - shell, - ) - self.assertEqual( - self.run_function("cmd.run", ["ls / | grep etc"], python_shell=True), "etc" - ) - self.assertEqual( - self.run_function( - "cmd.run", - ['echo {{grains.id}} | awk "{print $1}"'], - template="jinja", - python_shell=True, - ), - "minion", - ) - self.assertEqual( - self.run_function( - "cmd.run", ["grep f"], stdin="one\ntwo\nthree\nfour\nfive\n" - ), - "four\nfive", - ) - self.assertEqual( - self.run_function( - "cmd.run", ['echo "a=b" | sed -e s/=/:/g'], python_shell=True - ), - "a:b", - ) - - @pytest.mark.slow_test - def test_stdout(self): - """ - cmd.run_stdout - """ - self.assertEqual( - self.run_function("cmd.run_stdout", ['echo "cheese"']).rstrip(), - "cheese" if not salt.utils.platform.is_windows() else '"cheese"', - ) - - @pytest.mark.slow_test - def test_stderr(self): - """ - cmd.run_stderr - """ - if sys.platform.startswith(("freebsd", "openbsd")): - shell = "/bin/sh" - else: - shell = "/bin/bash" - - self.assertEqual( - self.run_function( - "cmd.run_stderr", - ['echo "cheese" 1>&2', "shell={}".format(shell)], - python_shell=True, - ).rstrip(), - "cheese" if not salt.utils.platform.is_windows() else '"cheese"', - ) - - @pytest.mark.slow_test - def test_run_all(self): - """ - cmd.run_all - """ - if sys.platform.startswith(("freebsd", "openbsd")): - shell = "/bin/sh" - else: - shell = "/bin/bash" - - ret = self.run_function( - "cmd.run_all", - ['echo "cheese" 1>&2', "shell={}".format(shell)], - python_shell=True, - ) - self.assertTrue("pid" in ret) - self.assertTrue("retcode" in ret) - self.assertTrue("stdout" in ret) - self.assertTrue("stderr" in ret) - self.assertTrue(isinstance(ret.get("pid"), int)) - self.assertTrue(isinstance(ret.get("retcode"), int)) - self.assertTrue(isinstance(ret.get("stdout"), str)) - self.assertTrue(isinstance(ret.get("stderr"), str)) - self.assertEqual( - ret.get("stderr").rstrip(), - "cheese" if not salt.utils.platform.is_windows() else '"cheese"', - ) - - @pytest.mark.slow_test - def test_retcode(self): - """ - cmd.retcode - """ - self.assertEqual( - self.run_function("cmd.retcode", ["exit 0"], python_shell=True), 0 - ) - self.assertEqual( - self.run_function("cmd.retcode", ["exit 1"], python_shell=True), 1 - ) - - @pytest.mark.slow_test - def test_run_all_with_success_retcodes(self): - """ - cmd.run with success_retcodes - """ - ret = self.run_function( - "cmd.run_all", ["exit 42"], success_retcodes=[42], python_shell=True - ) - - self.assertTrue("retcode" in ret) - self.assertEqual(ret.get("retcode"), 0) - - @pytest.mark.slow_test - def test_retcode_with_success_retcodes(self): - """ - cmd.run with success_retcodes - """ - ret = self.run_function( - "cmd.retcode", ["exit 42"], success_retcodes=[42], python_shell=True - ) - - self.assertEqual(ret, 0) - - @pytest.mark.slow_test - def test_run_all_with_success_stderr(self): - """ - cmd.run with success_retcodes - """ - random_file = "{}{}{}".format( - RUNTIME_VARS.TMP_ROOT_DIR, os.path.sep, random.random() - ) - - if salt.utils.platform.is_windows(): - func = "type" - expected_stderr = "cannot find the file specified" - else: - func = "cat" - expected_stderr = "No such file or directory" - ret = self.run_function( - "cmd.run_all", - ["{} {}".format(func, random_file)], - success_stderr=[expected_stderr], - python_shell=True, - ) - - self.assertTrue("retcode" in ret) - self.assertEqual(ret.get("retcode"), 0) - - @pytest.mark.slow_test - def test_blacklist_glob(self): - """ - cmd_blacklist_glob - """ - self.assertEqual( - self.run_function("cmd.run", ["bad_command --foo"]).rstrip(), - 'ERROR: The shell command "bad_command --foo" is not permitted', - ) - - @pytest.mark.slow_test - def test_script(self): - """ - cmd.script - """ - args = "saltines crackers biscuits=yes" - script = "salt://script.py" - ret = self.run_function("cmd.script", [script, args], saltenv="base") - self.assertEqual(ret["stdout"], args) - - @pytest.mark.slow_test - def test_script_query_string(self): - """ - cmd.script - """ - args = "saltines crackers biscuits=yes" - script = "salt://script.py?saltenv=base" - ret = self.run_function("cmd.script", [script, args], saltenv="base") - self.assertEqual(ret["stdout"], args) - - @pytest.mark.slow_test - def test_script_retcode(self): - """ - cmd.script_retcode - """ - script = "salt://script.py" - ret = self.run_function("cmd.script_retcode", [script], saltenv="base") - self.assertEqual(ret, 0) - - @pytest.mark.slow_test - def test_script_cwd(self): - """ - cmd.script with cwd - """ - tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) - args = "saltines crackers biscuits=yes" - script = "salt://script.py" - ret = self.run_function( - "cmd.script", [script, args], cwd=tmp_cwd, saltenv="base" - ) - self.assertEqual(ret["stdout"], args) - - @pytest.mark.slow_test - def test_script_cwd_with_space(self): - """ - cmd.script with cwd - """ - tmp_cwd = "{}{}test 2".format( - tempfile.mkdtemp(dir=RUNTIME_VARS.TMP), os.path.sep - ) - os.mkdir(tmp_cwd) - - args = "saltines crackers biscuits=yes" - script = "salt://script.py" - ret = self.run_function( - "cmd.script", [script, args], cwd=tmp_cwd, saltenv="base" - ) - self.assertEqual(ret["stdout"], args) - - @pytest.mark.destructive_test - def test_tty(self): - """ - cmd.tty - """ - for tty in ("tty0", "pts3"): - if os.path.exists(os.path.join("/dev", tty)): - ret = self.run_function("cmd.tty", [tty, "apply salt liberally"]) - self.assertTrue("Success" in ret) - - @pytest.mark.skip_on_windows - @pytest.mark.skip_if_binaries_missing("which") - def test_which(self): - """ - cmd.which - """ - cmd_which = self.run_function("cmd.which", ["cat"]) - self.assertIsInstance(cmd_which, str) - cmd_run = self.run_function("cmd.run", ["which cat"]) - self.assertIsInstance(cmd_run, str) - self.assertEqual(cmd_which.rstrip(), cmd_run.rstrip()) - - @pytest.mark.skip_on_windows - @pytest.mark.skip_if_binaries_missing("which") - def test_which_bin(self): - """ - cmd.which_bin - """ - cmds = ["pip3", "pip2", "pip", "pip-python"] - ret = self.run_function("cmd.which_bin", [cmds]) - self.assertTrue(os.path.split(ret)[1] in cmds) - - @pytest.mark.slow_test - def test_has_exec(self): - """ - cmd.has_exec - """ - self.assertTrue( - self.run_function("cmd.has_exec", [AVAILABLE_PYTHON_EXECUTABLE]) - ) - self.assertFalse( - self.run_function("cmd.has_exec", ["alllfsdfnwieulrrh9123857ygf"]) - ) - - @pytest.mark.slow_test - def test_exec_code(self): - """ - cmd.exec_code - """ - code = dedent( - """ - import sys - sys.stdout.write('cheese') - """ - ) - self.assertEqual( - self.run_function( - "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code] - ).rstrip(), - "cheese", - ) - - @pytest.mark.slow_test - def test_exec_code_with_single_arg(self): - """ - cmd.exec_code - """ - code = dedent( - """ - import sys - sys.stdout.write(sys.argv[1]) - """ - ) - arg = "cheese" - self.assertEqual( - self.run_function( - "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=arg - ).rstrip(), - arg, - ) - - @pytest.mark.slow_test - def test_exec_code_with_multiple_args(self): - """ - cmd.exec_code - """ - code = dedent( - """ - import sys - sys.stdout.write(sys.argv[1]) - """ - ) - arg = "cheese" - self.assertEqual( - self.run_function( - "cmd.exec_code", [AVAILABLE_PYTHON_EXECUTABLE, code], args=[arg, "test"] - ).rstrip(), - arg, - ) - - @pytest.mark.slow_test - def test_quotes(self): - """ - cmd.run with quoted command - """ - cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """ - expected_result = 'SELECT * FROM foo WHERE bar="baz"' - if salt.utils.platform.is_windows(): - expected_result = "'SELECT * FROM foo WHERE bar=\"baz\"'" - result = self.run_function("cmd.run_stdout", [cmd]).strip() - self.assertEqual(result, expected_result) - - @pytest.mark.skip_if_not_root - @pytest.mark.skip_on_windows(reason="Skip on Windows, requires password") - def test_quotes_runas(self): - """ - cmd.run with quoted command - """ - cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """ - expected_result = 'SELECT * FROM foo WHERE bar="baz"' - result = self.run_function( - "cmd.run_all", [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER - ) - errmsg = "The command returned: {}".format(result) - self.assertEqual(result["retcode"], 0, errmsg) - self.assertEqual(result["stdout"], expected_result, errmsg) - - @pytest.mark.destructive_test - @pytest.mark.skip_if_not_root - @pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands") - @pytest.mark.slow_test - def test_avoid_injecting_shell_code_as_root(self): - """ - cmd.run should execute the whole command as the "runas" user, not - running substitutions as root. - """ - cmd = "echo $(id -u)" - - root_id = self.run_function("cmd.run_stdout", [cmd]) - runas_root_id = self.run_function( - "cmd.run_stdout", [cmd], runas=RUNTIME_VARS.RUNNING_TESTS_USER - ) - with self._ensure_user_exists(self.runas_usr): - user_id = self.run_function("cmd.run_stdout", [cmd], runas=self.runas_usr) - - self.assertNotEqual(user_id, root_id) - self.assertNotEqual(user_id, runas_root_id) - self.assertEqual(root_id, runas_root_id) - - @pytest.mark.destructive_test - @pytest.mark.skip_if_not_root - @pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands") - @pytest.mark.slow_test - def test_cwd_runas(self): - """ - cmd.run should be able to change working directory correctly, whether - or not runas is in use. - """ - cmd = "pwd" - tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) - os.chmod(tmp_cwd, 0o711) - - cwd_normal = self.run_function("cmd.run_stdout", [cmd], cwd=tmp_cwd).rstrip( - "\n" - ) - self.assertEqual(tmp_cwd, cwd_normal) - - with self._ensure_user_exists(self.runas_usr): - cwd_runas = self.run_function( - "cmd.run_stdout", [cmd], cwd=tmp_cwd, runas=self.runas_usr - ).rstrip("\n") - self.assertEqual(tmp_cwd, cwd_runas) - - @pytest.mark.destructive_test - @pytest.mark.skip_if_not_root - @pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only") - @pytest.mark.slow_test - def test_runas_env(self): - """ - cmd.run should be able to change working directory correctly, whether - or not runas is in use. - """ - with self._ensure_user_exists(self.runas_usr): - user_path = self.run_function( - "cmd.run_stdout", ['printf %s "$PATH"'], runas=self.runas_usr - ) - # XXX: Not sure of a better way. Environment starts out with - # /bin:/usr/bin and should be populated by path helper and the bash - # profile. - self.assertNotEqual("/bin:/usr/bin", user_path) - - @pytest.mark.destructive_test - @pytest.mark.skip_if_not_root - @pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only") - @pytest.mark.slow_test - def test_runas_complex_command_bad_cwd(self): - """ - cmd.run should not accidentally run parts of a complex command when - given a cwd which cannot be used by the user the command is run as. - - Due to the need to use `su -l` to login to another user on MacOS, we - cannot cd into directories that the target user themselves does not - have execute permission for. To an extent, this test is testing that - buggy behaviour, but its purpose is to ensure that the greater bug of - running commands after failing to cd does not occur. - """ - tmp_cwd = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) - os.chmod(tmp_cwd, 0o700) - - with self._ensure_user_exists(self.runas_usr): - cmd_result = self.run_function( - "cmd.run_all", - ['pwd; pwd; : $(echo "You have failed the test" >&2)'], - cwd=tmp_cwd, - runas=self.runas_usr, - ) - - self.assertEqual("", cmd_result["stdout"]) - self.assertNotIn("You have failed the test", cmd_result["stderr"]) - self.assertNotEqual(0, cmd_result["retcode"]) - - @SKIP_INITIAL_PHOTONOS_FAILURES - @pytest.mark.skip_on_windows - @pytest.mark.skip_if_not_root - @pytest.mark.destructive_test - @pytest.mark.slow_test - def test_runas(self): - """ - Ensure that the env is the runas user's - """ - with self._ensure_user_exists(self.runas_usr): - out = self.run_function( - "cmd.run", ["env"], runas=self.runas_usr - ).splitlines() - self.assertIn("USER={}".format(self.runas_usr), out) - - @pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed") - def test_timeout(self): - """ - cmd.run trigger timeout - """ - out = self.run_function( - "cmd.run", ["sleep 2 && echo hello"], f_timeout=1, python_shell=True - ) - self.assertTrue("Timed out" in out) - - @pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed") - def test_timeout_success(self): - """ - cmd.run sufficient timeout to succeed - """ - out = self.run_function( - "cmd.run", ["sleep 1 && echo hello"], f_timeout=2, python_shell=True - ) - self.assertEqual(out, "hello") - - @pytest.mark.slow_test - def test_hide_output(self): - """ - Test the hide_output argument - """ - ls_command = ( - ["ls", "/"] if not salt.utils.platform.is_windows() else ["dir", "c:\\"] - ) - - error_command = ["thiscommanddoesnotexist"] - - # cmd.run - out = self.run_function("cmd.run", ls_command, hide_output=True) - self.assertEqual(out, "") - - # cmd.shell - out = self.run_function("cmd.shell", ls_command, hide_output=True) - self.assertEqual(out, "") - - # cmd.run_stdout - out = self.run_function("cmd.run_stdout", ls_command, hide_output=True) - self.assertEqual(out, "") - - # cmd.run_stderr - out = self.run_function("cmd.shell", error_command, hide_output=True) - self.assertEqual(out, "") - - # cmd.run_all (command should have produced stdout) - out = self.run_function("cmd.run_all", ls_command, hide_output=True) - self.assertEqual(out["stdout"], "") - self.assertEqual(out["stderr"], "") - - # cmd.run_all (command should have produced stderr) - out = self.run_function("cmd.run_all", error_command, hide_output=True) - self.assertEqual(out["stdout"], "") - self.assertEqual(out["stderr"], "") - - @pytest.mark.slow_test - def test_cmd_run_whoami(self): - """ - test return of whoami - """ - if not salt.utils.platform.is_windows(): - user = RUNTIME_VARS.RUNTIME_CONFIGS["master"]["user"] - else: - user = salt.utils.user.get_specific_user() - if user.startswith("sudo_"): - user = user.replace("sudo_", "") - cmd = self.run_function("cmd.run", ["whoami"]) - try: - self.assertEqual(user.lower(), cmd.lower()) - except AssertionError as exc: - if not salt.utils.platform.is_windows(): - raise exc from None - if "\\" in user: - user = user.split("\\")[-1] - self.assertEqual(user.lower(), cmd.lower()) - - @pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") - @pytest.mark.slow_test - def test_windows_env_handling(self): - """ - Ensure that nt.environ is used properly with cmd.run* - """ - out = self.run_function( - "cmd.run", ["set"], env={"abc": "123", "ABC": "456"} - ).splitlines() - self.assertIn("abc=123", out) - self.assertIn("ABC=456", out) - - @pytest.mark.slow_test - @pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") - def test_windows_powershell_script_args(self): - """ - Ensure that powershell processes inline script in args - """ - val = "i like cheese" - args = ( - '-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)' - " -ErrorAction Stop".format(val) - ) - script = "salt://issue-56195/test.ps1" - ret = self.run_function( - "cmd.script", [script], args=args, shell="powershell", saltenv="base" - ) - self.assertEqual(ret["stdout"], val) - - @pytest.mark.slow_test - @pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") - @pytest.mark.skip_if_binaries_missing("pwsh") - def test_windows_powershell_script_args_pwsh(self): - """ - Ensure that powershell processes inline script in args with powershell - core - """ - val = "i like cheese" - args = ( - '-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)' - " -ErrorAction Stop".format(val) - ) - script = "salt://issue-56195/test.ps1" - ret = self.run_function( - "cmd.script", [script], args=args, shell="pwsh", saltenv="base" - ) - self.assertEqual(ret["stdout"], val) diff --git a/tests/pytests/functional/modules/test_cmdmod.py b/tests/pytests/functional/modules/test_cmdmod.py new file mode 100644 index 000000000000..95dc88d53a0f --- /dev/null +++ b/tests/pytests/functional/modules/test_cmdmod.py @@ -0,0 +1,561 @@ +import os +import random +import sys +from contextlib import contextmanager + +import pytest + +import salt.config +import salt.utils.path +import salt.utils.platform +import salt.utils.user +from tests.support.helpers import SKIP_INITIAL_PHOTONOS_FAILURES, dedent + +pytestmark = [pytest.mark.windows_whitelisted] + + +@pytest.fixture(scope="module") +def cmdmod(modules): + return modules.cmd + + +@pytest.fixture(scope="module") +def usermod(modules): + return modules.user + + +@pytest.fixture(scope="module") +def available_python_executable(): + yield salt.utils.path.which_bin(["python", "python2", "python2.6", "python2.7"]) + + +@pytest.fixture +def runas_usr(): + runas_usr = "nobody" + if salt.utils.platform.is_darwin(): + runas_usr = "macsalttest" + yield runas_usr + + +@pytest.fixture +def running_username(): + """ + Return the username that is running the code. + """ + return salt.utils.user.get_user() + + +@pytest.fixture +def script_contents(state_tree): + _contents = """ + #!/usr/bin/env python + import sys + print(" ".join(sys.argv[1:])) + """ + + with pytest.helpers.temp_file("script.py", _contents, state_tree): + yield + + +@pytest.fixture +def issue_56195_test_ps1(state_tree): + _contents = """ + [CmdLetBinding()] + Param( + [SecureString] $SecureString + ) + $Credential = New-Object System.Net.NetworkCredential("DummyId", $SecureString) + $Credential.Password + """ + + with pytest.helpers.temp_file("issue_56195_test.ps1", _contents, state_tree): + yield + + +@contextmanager +def _ensure_user_exists(name, usermod): + if name in usermod.info(name).values(): + # User already exists; don't touch + yield + else: + # Need to create user for test + usermod.add(name) + try: + yield + finally: + usermod.delete(name, remove=True) + + +@pytest.mark.slow_test +def test_run(cmdmod): + """ + cmd.run + """ + shell = os.environ.get("SHELL") + if shell is None: + # Failed to get the SHELL var, don't run + pytest.skip("Unable to get the SHELL environment variable") + + assert cmdmod.run("echo $SHELL") + assert cmdmod.run("echo $SHELL", shell=shell, python_shell=True).rstrip() == shell + assert cmdmod.run("ls / | grep etc", python_shell=True) == "etc" + assert ( + cmdmod.run( + 'echo {{grains.id}} | awk "{print $1}"', + template="jinja", + python_shell=True, + ) + == "func-tests-minion" + ) + assert cmdmod.run("grep f", stdin="one\ntwo\nthree\nfour\nfive\n") == "four\nfive" + assert cmdmod.run('echo "a=b" | sed -e s/=/:/g', python_shell=True) == "a:b" + + +@pytest.mark.slow_test +def test_stdout(cmdmod): + """ + cmd.run_stdout + """ + assert ( + cmdmod.run_stdout('echo "cheese"').rstrip() == "cheese" + if not salt.utils.platform.is_windows() + else '"cheese"' + ) + + +@pytest.mark.slow_test +def test_stderr(cmdmod): + """ + cmd.run_stderr + """ + if sys.platform.startswith(("freebsd", "openbsd")): + shell = "/bin/sh" + else: + shell = "/bin/bash" + + assert ( + cmdmod.run_stderr( + 'echo "cheese" 1>&2', + shell=shell, + python_shell=True, + ).rstrip() + == "cheese" + if not salt.utils.platform.is_windows() + else '"cheese"' + ) + + +@pytest.mark.slow_test +def test_run_all(cmdmod): + """ + cmd.run_all + """ + if sys.platform.startswith(("freebsd", "openbsd")): + shell = "/bin/sh" + else: + shell = "/bin/bash" + + ret = cmdmod.run_all( + 'echo "cheese" 1>&2', + shell=shell, + python_shell=True, + ) + assert "pid" in ret + assert "retcode" in ret + assert "stdout" in ret + assert "stderr" in ret + assert isinstance(ret.get("pid"), int) + assert isinstance(ret.get("retcode"), int) + assert isinstance(ret.get("stdout"), str) + assert isinstance(ret.get("stderr"), str) + assert ( + ret.get("stderr").rstrip() == "cheese" + if not salt.utils.platform.is_windows() + else '"cheese"' + ) + + +@pytest.mark.slow_test +def test_retcode(cmdmod): + """ + cmd.retcode + """ + assert cmdmod.retcode("exit 0", python_shell=True) == 0 + assert cmdmod.retcode("exit 1", python_shell=True) == 1 + + +@pytest.mark.slow_test +def test_run_all_with_success_retcodes(cmdmod): + """ + cmd.run with success_retcodes + """ + ret = cmdmod.run_all("exit 42", success_retcodes=[42], python_shell=True) + + assert "retcode" in ret + assert ret.get("retcode") == 0 + + +@pytest.mark.slow_test +def test_retcode_with_success_retcodes(cmdmod): + """ + cmd.run with success_retcodes + """ + ret = cmdmod.retcode("exit 42", success_retcodes=[42], python_shell=True) + + assert ret == 0 + + +@pytest.mark.slow_test +def test_run_all_with_success_stderr(cmdmod, tmp_path): + """ + cmd.run with success_retcodes + """ + random_file = str(tmp_path / f"{random.random()}") + + if salt.utils.platform.is_windows(): + func = "type" + expected_stderr = "cannot find the file specified" + else: + func = "cat" + expected_stderr = "No such file or directory" + ret = cmdmod.run_all( + f"{func} {random_file}", + success_stderr=[expected_stderr], + python_shell=True, + ) + + assert "retcode" in ret + assert ret.get("retcode") == 0 + + +@pytest.mark.slow_test +def test_script(cmdmod, script_contents): + """ + cmd.script + """ + args = "saltines crackers biscuits=yes" + script = "salt://script.py" + ret = cmdmod.script(script, args, saltenv="base") + assert ret["stdout"] == args + + +@pytest.mark.slow_test +def test_script_query_string(cmdmod, script_contents): + """ + cmd.script + """ + args = "saltines crackers biscuits=yes" + script = "salt://script.py?saltenv=base" + ret = cmdmod.script(script, args, saltenv="base") + assert ret["stdout"] == args + + +@pytest.mark.slow_test +def test_script_retcode(cmdmod, script_contents): + """ + cmd.script_retcode + """ + script = "salt://script.py" + ret = cmdmod.script_retcode(script, saltenv="base") + assert ret == 0 + + +@pytest.mark.slow_test +def test_script_cwd(cmdmod, script_contents, tmp_path): + """ + cmd.script with cwd + """ + tmp_cwd = str(tmp_path) + args = "saltines crackers biscuits=yes" + script = "salt://script.py" + ret = cmdmod.script(script, args, cwd=tmp_cwd, saltenv="base") + assert ret["stdout"] == args + + +@pytest.mark.slow_test +def test_script_cwd_with_space(cmdmod, script_contents, tmp_path): + """ + cmd.script with cwd + """ + tmp_cwd = str(tmp_path / "test 2") + os.mkdir(tmp_cwd) + + args = "saltines crackers biscuits=yes" + script = "salt://script.py" + ret = cmdmod.script(script, args, cwd=tmp_cwd, saltenv="base") + assert ret["stdout"] == args + + +@pytest.mark.destructive_test +def test_tty(cmdmod): + """ + cmd.tty + """ + for tty in ("tty0", "pts3"): + if os.path.exists(os.path.join("/dev", tty)): + ret = cmdmod.tty(tty, "apply salt liberally") + assert "Success" in ret + + +@pytest.mark.skip_on_windows +@pytest.mark.skip_if_binaries_missing("which") +def test_which(cmdmod): + """ + cmd.which + """ + cmd_which = cmdmod.which("cat") + assert isinstance(cmd_which, str) + cmd_run = cmdmod.run("which cat") + assert isinstance(cmd_run, str) + assert cmd_which.rstrip() == cmd_run.rstrip() + + +@pytest.mark.skip_on_windows +@pytest.mark.skip_if_binaries_missing("which") +def test_which_bin(cmdmod): + """ + cmd.which_bin + """ + cmds = ["pip3", "pip2", "pip", "pip-python"] + ret = cmdmod.which_bin(cmds) + assert os.path.split(ret)[1] in cmds + + +@pytest.mark.slow_test +def test_has_exec(cmdmod, available_python_executable): + """ + cmd.has_exec + """ + assert cmdmod.has_exec(available_python_executable) + assert not cmdmod.has_exec("alllfsdfnwieulrrh9123857ygf") + + +@pytest.mark.slow_test +def test_exec_code(cmdmod, available_python_executable): + """ + cmd.exec_code + """ + code = dedent( + """ + import sys + sys.stdout.write('cheese') + """ + ) + assert cmdmod.exec_code(available_python_executable, code).rstrip() == "cheese" + + +@pytest.mark.slow_test +def test_exec_code_with_single_arg(cmdmod, available_python_executable): + """ + cmd.exec_code + """ + code = dedent( + """ + import sys + sys.stdout.write(sys.argv[1]) + """ + ) + arg = "cheese" + assert cmdmod.exec_code(available_python_executable, code, args=arg).rstrip() == arg + + +@pytest.mark.slow_test +def test_exec_code_with_multiple_args(cmdmod, available_python_executable): + """ + cmd.exec_code + """ + code = dedent( + """ + import sys + sys.stdout.write(sys.argv[1]) + """ + ) + arg = "cheese" + assert ( + cmdmod.exec_code(available_python_executable, code, args=[arg, "test"]).rstrip() + == arg + ) + + +@pytest.mark.slow_test +def test_quotes(cmdmod): + """ + cmd.run with quoted command + """ + cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """ + expected_result = 'SELECT * FROM foo WHERE bar="baz"' + result = cmdmod.run_stdout(cmd).strip() + assert result == expected_result + + +@pytest.mark.skip_if_not_root +@pytest.mark.skip_on_windows(reason="Skip on Windows, requires password") +def test_quotes_runas(cmdmod, running_username): + """ + cmd.run with quoted command + """ + cmd = """echo 'SELECT * FROM foo WHERE bar="baz"' """ + expected_result = 'SELECT * FROM foo WHERE bar="baz"' + result = cmdmod.run_all(cmd, runas=running_username) + errmsg = f"The command returned: {result}" + assert result["retcode"] == 0, errmsg + assert result["stdout"] == expected_result, errmsg + + +@pytest.mark.destructive_test +@pytest.mark.skip_if_not_root +@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands") +@pytest.mark.slow_test +def test_cwd_runas(cmdmod, usermod, runas_usr, tmp_path): + """ + cmd.run should be able to change working directory correctly, whether + or not runas is in use. + """ + cmd = "pwd" + tmp_cwd = str(tmp_path) + os.chmod(tmp_cwd, 0o711) + + cwd_normal = cmdmod.run_stdout(cmd, cwd=tmp_cwd).rstrip("\n") + assert tmp_cwd == cwd_normal + + with _ensure_user_exists(runas_usr, usermod): + cwd_runas = cmdmod.run_stdout(cmd, cwd=tmp_cwd, runas=runas_usr).rstrip("\n") + assert tmp_cwd == cwd_runas + + +@pytest.mark.destructive_test +@pytest.mark.skip_if_not_root +@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only") +@pytest.mark.slow_test +def test_runas_env(cmdmod, usermod, runas_usr): + """ + cmd.run should be able to change working directory correctly, whether + or not runas is in use. + """ + with _ensure_user_exists(runas_usr, usermod): + user_path = cmdmod.run_stdout('printf %s "$PATH"', runas=runas_usr) + # XXX: Not sure of a better way. Environment starts out with + # /bin:/usr/bin and should be populated by path helper and the bash + # profile. + assert "/bin:/usr/bin" != user_path + + +@pytest.mark.destructive_test +@pytest.mark.skip_if_not_root +@pytest.mark.skip_unless_on_darwin(reason="Applicable to MacOS only") +@pytest.mark.slow_test +def test_runas_complex_command_bad_cwd(cmdmod, usermod, runas_usr, tmp_path): + """ + cmd.run should not accidentally run parts of a complex command when + given a cwd which cannot be used by the user the command is run as. + Due to the need to use `su -l` to login to another user on MacOS, we + cannot cd into directories that the target user themselves does not + have execute permission for. To an extent, this test is testing that + buggy behaviour, but its purpose is to ensure that the greater bug of + running commands after failing to cd does not occur. + """ + tmp_cwd = str(tmp_path) + os.chmod(tmp_cwd, 0o700) + + with _ensure_user_exists(runas_usr, usermod): + cmd_result = cmdmod.run_all( + 'pwd; pwd; : $(echo "You have failed the test" >&2)', + cwd=tmp_cwd, + runas=runas_usr, + ) + + assert "" == cmd_result["stdout"] + assert "You have failed the test" not in cmd_result["stderr"] + assert 0 != cmd_result["retcode"] + + +@SKIP_INITIAL_PHOTONOS_FAILURES +@pytest.mark.skip_on_windows +@pytest.mark.skip_if_not_root +@pytest.mark.destructive_test +@pytest.mark.slow_test +def test_runas(cmdmod, usermod, runas_usr): + """ + Ensure that the env is the runas user's + """ + with _ensure_user_exists(runas_usr, usermod): + out = cmdmod.run("env", runas=runas_usr).splitlines() + assert f"USER={runas_usr}" in out + + +@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed") +def test_timeout(cmdmod): + """ + cmd.run trigger timeout + """ + out = cmdmod.run("sleep 2 && echo hello", timeout=1, python_shell=True) + assert "Timed out" in out + + +@pytest.mark.skip_if_binaries_missing("sleep", reason="sleep cmd not installed") +def test_timeout_success(cmdmod): + """ + cmd.run sufficient timeout to succeed + """ + out = cmdmod.run("sleep 1 && echo hello", timeout=2, python_shell=True) + assert out == "hello" + + +@pytest.mark.slow_test +def test_cmd_run_whoami(cmdmod, running_username): + """ + test return of whoami + """ + if not salt.utils.platform.is_windows(): + user = running_username + else: + user = salt.utils.user.get_specific_user() + if user.startswith("sudo_"): + user = user.replace("sudo_", "") + cmd = cmdmod.run("whoami") + assert user.lower() == cmd.lower() + + +@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") +@pytest.mark.slow_test +def test_windows_env_handling(cmdmod): + """ + Ensure that nt.environ is used properly with cmd.run* + """ + out = cmdmod.run("set", env={"abc": "123", "ABC": "456"}).splitlines() + assert "abc=123" in out + assert "ABC=456" in out + + +@pytest.mark.slow_test +@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") +def test_windows_powershell_script_args(cmdmod, issue_56195_test_ps1): + """ + Ensure that powershell processes inline script in args + """ + val = "i like cheese" + args = ( + '-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)' + " -ErrorAction Stop".format(val) + ) + script = "salt://issue_56195_test.ps1" + ret = cmdmod.script(script, args=args, shell="powershell", saltenv="base") + assert ret["stdout"] == val + + +@pytest.mark.slow_test +@pytest.mark.skip_unless_on_windows(reason="Minion is not Windows") +@pytest.mark.skip_if_binaries_missing("pwsh") +def test_windows_powershell_script_args_pwsh(cmdmod, issue_56195_test_ps1): + """ + Ensure that powershell processes inline script in args with powershell + core + """ + val = "i like cheese" + args = ( + '-SecureString (ConvertTo-SecureString -String "{}" -AsPlainText -Force)' + " -ErrorAction Stop".format(val) + ) + script = "salt://issue_56195_test.ps1" + ret = cmdmod.script(script, args=args, shell="pwsh", saltenv="base") + assert ret["stdout"] == val diff --git a/tests/pytests/integration/modules/test_cmdmod.py b/tests/pytests/integration/modules/test_cmdmod.py index 4e8ce5824ee4..d9c326c3f0a2 100644 --- a/tests/pytests/integration/modules/test_cmdmod.py +++ b/tests/pytests/integration/modules/test_cmdmod.py @@ -1,5 +1,11 @@ +import logging + import pytest +import salt.utils.user + +log = logging.getLogger(__name__) + @pytest.fixture(scope="module") def non_root_account(): @@ -7,6 +13,14 @@ def non_root_account(): yield account +@pytest.fixture +def running_username(): + """ + Return the username that is running the code. + """ + return salt.utils.user.get_user() + + @pytest.mark.skip_if_not_root def test_exec_code_all(salt_call_cli, non_root_account): ret = salt_call_cli.run( @@ -22,3 +36,82 @@ def test_long_stdout(salt_cli, salt_minion): ) assert ret.returncode == 0 assert len(ret.data.strip()) == len(echo_str) + + +@pytest.mark.skip_if_not_root +@pytest.mark.skip_on_windows(reason="Skip on Windows, uses unix commands") +def test_avoid_injecting_shell_code_as_root( + salt_call_cli, non_root_account, running_username +): + """ + cmd.run should execute the whole command as the "runas" user, not + running substitutions as root. + """ + cmd = "echo $(id -u)" + + ret = salt_call_cli.run("cmd.run_stdout", cmd) + root_id = ret.json + ret = salt_call_cli.run("cmd.run_stdout", cmd, runas=running_username) + runas_root_id = ret.json + + ret = salt_call_cli.run("cmd.run_stdout", cmd, runas=non_root_account.username) + user_id = ret.json + + assert user_id != root_id + assert user_id != runas_root_id + assert root_id == runas_root_id + + +@pytest.mark.slow_test +def test_blacklist_glob(salt_call_cli): + """ + cmd_blacklist_glob + """ + cmd = "bad_command --foo" + ret = salt_call_cli.run( + "cmd.run", + cmd, + ) + + assert ( + ret.stderr.rstrip() + == "Error running 'cmd.run': The shell command \"bad_command --foo\" is not permitted" + ) + + +@pytest.mark.slow_test +def test_hide_output(salt_call_cli): + """ + Test the hide_output argument + """ + ls_command = ( + ["ls", "/"] if not salt.utils.platform.is_windows() else ["dir", "c:\\"] + ) + + error_command = ["thiscommanddoesnotexist"] + + # cmd.run + ret = salt_call_cli.run("cmd.run", ls_command, hide_output=True) + assert ret.data == "" + + # cmd.shell + ret = salt_call_cli.run("cmd.shell", ls_command, hide_output=True) + assert ret.data == "" + + # cmd.run_stdout + ret = salt_call_cli.run("cmd.run_stdout", ls_command, hide_output=True) + assert ret.data == "" + + # cmd.run_stderr + ret = salt_call_cli.run("cmd.shell", error_command, hide_output=True) + assert ret.data == "" + + # cmd.run_all (command should have produced stdout) + ret = salt_call_cli.run("cmd.run_all", ls_command, hide_output=True) + assert ret.data["stdout"] == "" + assert ret.data["stderr"] == "" + + # cmd.run_all (command should have produced stderr) + ret = salt_call_cli.run("cmd.run_all", error_command, hide_output=True) + assert ret.data["stdout"] == "" + assert ret.data["stderr"] == ""