Skip to content

Commit

Permalink
[sonic-py-common] Add getstatusoutput_noshell() functions to general …
Browse files Browse the repository at this point in the history
…module (sonic-net#12065)

Signed-off-by: maipbui <maibui@microsoft.com>
#### Why I did it
`getstatusoutput()` function from `subprocess` module has shell injection issue because it includes `shell=True` in the implementation
Eliminate duplicate code
#### How I did it
Reimplement `getstatusoutput_noshell()` and `getstatusoutput_noshell_pipe()` functions with `shell=False`
Add `check_output_pipe()` function
#### How to verify it
Pass UT
  • Loading branch information
maipbui authored and roberthong-qct committed Nov 18, 2022
1 parent fb9f5ce commit cc922ba
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
68 changes: 68 additions & 0 deletions src/sonic-py-common/sonic_py_common/general.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
from subprocess import Popen, STDOUT, PIPE, CalledProcessError, check_output


def load_module_from_source(module_name, file_path):
Expand All @@ -23,3 +24,70 @@ def load_module_from_source(module_name, file_path):
sys.modules[module_name] = module

return module


def getstatusoutput_noshell(cmd):
"""
This function implements getstatusoutput API from subprocess module
but using shell=False to prevent shell injection.
Ref: https://github.com/python/cpython/blob/3.10/Lib/subprocess.py#L602
"""
try:
output = check_output(cmd, universal_newlines=True, stderr=STDOUT)
exitcode = 0
except CalledProcessError as ex:
output = ex.output
exitcode = ex.returncode
if output[-1:] == '\n':
output = output[:-1]
return exitcode, output


def getstatusoutput_noshell_pipe(cmd0, *args):
"""
This function implements getstatusoutput API from subprocess module
but using shell=False to prevent shell injection. Input command
includes two or more commands connected by shell pipe(s).
"""
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
i = 0
while i < len(args):
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
popens[i].stdout.close()
i += 1
output = popens[-1].communicate()[0]
if output[-1:] == '\n':
output = output[:-1]

exitcodes = [0] * len(popens)
while popens:
last = popens.pop(-1)
exitcodes[len(popens)] = last.wait()

return (exitcodes, output)


def check_output_pipe(cmd0, *args):
"""
This function implements check_output API from subprocess module.
Input command includes two or more commands connected by shell pipe(s)
"""
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
i = 0
while i < len(args):
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
popens[i].stdout.close()
i += 1
output = popens[-1].communicate()[0]

i = 0
args_list = [cmd0] + list(args)
while popens:
current = popens.pop(0)
exitcode = current.wait()
if exitcode != 0:
raise CalledProcessError(returncode=exitcode, cmd=args_list[i], output=current.stdout)
i += 1

return output

31 changes: 31 additions & 0 deletions src/sonic-py-common/tests/test_general.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys
import pytest
import subprocess
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe, check_output_pipe


def test_getstatusoutput_noshell(tmp_path):
exitcode, output = getstatusoutput_noshell(['echo', 'sonic'])
assert (exitcode, output) == (0, 'sonic')

exitcode, output = getstatusoutput_noshell([sys.executable, "-c", "import sys; sys.exit(6)"])
assert exitcode != 0

def test_getstatusoutput_noshell_pipe():
exitcode, output = getstatusoutput_noshell_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
assert (exitcode, output) == ([0, 0], 'sonic')

exitcode, output = getstatusoutput_noshell_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(8)"])
assert exitcode == [6, 8]

def test_check_output_pipe():
output = check_output_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
assert output == 'sonic\n'

with pytest.raises(subprocess.CalledProcessError) as e:
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(0)"])
assert e.returncode == [6, 0]

with pytest.raises(subprocess.CalledProcessError) as e:
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(0)"], [sys.executable, "-c", "import sys; sys.exit(6)"])
assert e.returncode == [0, 6]

0 comments on commit cc922ba

Please sign in to comment.