diff --git a/src/sonic-py-common/sonic_py_common/general.py b/src/sonic-py-common/sonic_py_common/general.py index 9e04f3e214ee..4fea165de9fb 100644 --- a/src/sonic-py-common/sonic_py_common/general.py +++ b/src/sonic-py-common/sonic_py_common/general.py @@ -1,4 +1,5 @@ import sys +from subprocess import Popen, STDOUT, PIPE, CalledProcessError, check_output def load_module_from_source(module_name, file_path): @@ -23,3 +24,70 @@ def load_module_from_source(module_name, file_path): sys.modules[module_name] = module return module + + +def getstatusoutput_noshell(cmd): + """ + This function implements getstatusoutput API from subprocess module + but using shell=False to prevent shell injection. + Ref: https://github.com/python/cpython/blob/3.10/Lib/subprocess.py#L602 + """ + try: + output = check_output(cmd, universal_newlines=True, stderr=STDOUT) + exitcode = 0 + except CalledProcessError as ex: + output = ex.output + exitcode = ex.returncode + if output[-1:] == '\n': + output = output[:-1] + return exitcode, output + + +def getstatusoutput_noshell_pipe(cmd0, *args): + """ + This function implements getstatusoutput API from subprocess module + but using shell=False to prevent shell injection. Input command + includes two or more commands connected by shell pipe(s). + """ + popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)] + i = 0 + while i < len(args): + popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True)) + popens[i].stdout.close() + i += 1 + output = popens[-1].communicate()[0] + if output[-1:] == '\n': + output = output[:-1] + + exitcodes = [0] * len(popens) + while popens: + last = popens.pop(-1) + exitcodes[len(popens)] = last.wait() + + return (exitcodes, output) + + +def check_output_pipe(cmd0, *args): + """ + This function implements check_output API from subprocess module. + Input command includes two or more commands connected by shell pipe(s) + """ + popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)] + i = 0 + while i < len(args): + popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True)) + popens[i].stdout.close() + i += 1 + output = popens[-1].communicate()[0] + + i = 0 + args_list = [cmd0] + list(args) + while popens: + current = popens.pop(0) + exitcode = current.wait() + if exitcode != 0: + raise CalledProcessError(returncode=exitcode, cmd=args_list[i], output=current.stdout) + i += 1 + + return output + diff --git a/src/sonic-py-common/tests/test_general.py b/src/sonic-py-common/tests/test_general.py new file mode 100644 index 000000000000..a395cf9aeb6b --- /dev/null +++ b/src/sonic-py-common/tests/test_general.py @@ -0,0 +1,31 @@ +import sys +import pytest +import subprocess +from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe, check_output_pipe + + +def test_getstatusoutput_noshell(tmp_path): + exitcode, output = getstatusoutput_noshell(['echo', 'sonic']) + assert (exitcode, output) == (0, 'sonic') + + exitcode, output = getstatusoutput_noshell([sys.executable, "-c", "import sys; sys.exit(6)"]) + assert exitcode != 0 + +def test_getstatusoutput_noshell_pipe(): + exitcode, output = getstatusoutput_noshell_pipe(['echo', 'sonic'], ['awk', '{print $1}']) + assert (exitcode, output) == ([0, 0], 'sonic') + + exitcode, output = getstatusoutput_noshell_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(8)"]) + assert exitcode == [6, 8] + +def test_check_output_pipe(): + output = check_output_pipe(['echo', 'sonic'], ['awk', '{print $1}']) + assert output == 'sonic\n' + + with pytest.raises(subprocess.CalledProcessError) as e: + check_output_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(0)"]) + assert e.returncode == [6, 0] + + with pytest.raises(subprocess.CalledProcessError) as e: + check_output_pipe([sys.executable, "-c", "import sys; sys.exit(0)"], [sys.executable, "-c", "import sys; sys.exit(6)"]) + assert e.returncode == [0, 6]