Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored ps() function in test_posix #1341

Merged
merged 2 commits into from
Sep 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions psutil/tests/test_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,14 +1978,6 @@ def test_cpu_affinity_eligible_cpus(self):
@unittest.skipIf(not LINUX, "LINUX only")
class TestUtils(unittest.TestCase):

def test_open_text(self):
with psutil._psplatform.open_text(__file__) as f:
self.assertEqual(f.mode, 'rt')

def test_open_binary(self):
with psutil._psplatform.open_binary(__file__) as f:
self.assertEqual(f.mode, 'rb')

def test_readlink(self):
with mock.patch("os.readlink", return_value="foo (deleted)") as m:
self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
Expand Down
10 changes: 10 additions & 0 deletions psutil/tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil._common import wrap_numbers
from psutil._common import open_text
from psutil._common import open_binary
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import bind_socket
Expand Down Expand Up @@ -896,6 +898,14 @@ def setUp(self):

tearDown = setUp

def test_open_text(self):
with open_text(__file__) as f:
self.assertEqual(f.mode, 'rt')

def test_open_binary(self):
with open_binary(__file__) as f:
self.assertEqual(f.mode, 'rb')

def test_safe_mkdir(self):
safe_mkdir(TESTFN)
assert os.path.isdir(TESTFN)
Expand Down
105 changes: 62 additions & 43 deletions psutil/tests/test_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import os
import re
import subprocess
import sys
import time

import psutil
Expand All @@ -23,7 +22,6 @@
from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import get_kernel_version
from psutil.tests import get_test_subprocess
Expand All @@ -40,23 +38,54 @@
from psutil.tests import which


def ps(cmd):
"""Expects a ps command with a -o argument and parse the result
returning only the value of interest.
def ps(fmt, pid=None):
"""
if not LINUX:
cmd = cmd.replace(" --no-headers ", " ")
Wrapper for calling the ps command with a little bit of cross-platform
support for a narrow range of features.
"""

cmd = ['ps']

if LINUX:
cmd.append('--no-headers')

if pid is not None:
cmd.extend(['-p', str(pid)])
else:
if SUNOS:
cmd.append('-A')
else:
cmd.append('ax')

if SUNOS:
cmd = cmd.replace("-o start", "-o stime")
if AIX:
cmd = cmd.replace("-o rss", "-o rssize")
fmt_map = {'command', 'comm',
'start', 'stime'}
fmt = fmt_map.get(fmt, fmt)

cmd.extend(['-o', fmt])

output = sh(cmd)
if not LINUX:
output = output.split('\n')[1].strip()
try:
return int(output)
except ValueError:
return output

if LINUX:
output = output.splitlines()
else:
output = output.splitlines()[1:]

all_output = []
for line in output:
line = line.strip()

try:
line = int(line)
except ValueError:
pass

all_output.append(line)

if pid is None:
return all_output
else:
return all_output[0]

# ps "-o" field names differ wildly between platforms.
# "comm" means "only executable name" but is not available on BSD platforms.
Expand All @@ -74,14 +103,14 @@ def ps_name(pid):
field = "command"
if SUNOS:
field = "comm"
return ps("ps --no-headers -o %s -p %s" % (field, pid)).split(' ')[0]
return ps(field, pid).split()[0]


def ps_args(pid):
field = "command"
if AIX or SUNOS:
field = "args"
return ps("ps --no-headers -o %s -p %s" % (field, pid))
return ps(field, pid)


@unittest.skipIf(not POSIX, "POSIX only")
Expand All @@ -99,22 +128,22 @@ def tearDownClass(cls):
reap_children()

def test_ppid(self):
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
ppid_ps = ps('ppid', self.pid)
ppid_psutil = psutil.Process(self.pid).ppid()
self.assertEqual(ppid_ps, ppid_psutil)

def test_uid(self):
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
uid_ps = ps('uid', self.pid)
uid_psutil = psutil.Process(self.pid).uids().real
self.assertEqual(uid_ps, uid_psutil)

def test_gid(self):
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
gid_ps = ps('rgid', self.pid)
gid_psutil = psutil.Process(self.pid).gids().real
self.assertEqual(gid_ps, gid_psutil)

def test_username(self):
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
username_ps = ps('user', self.pid)
username_psutil = psutil.Process(self.pid).username()
self.assertEqual(username_ps, username_psutil)

Expand All @@ -133,7 +162,7 @@ def test_rss_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
rss_ps = ps('rss', self.pid)
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
self.assertEqual(rss_ps, rss_psutil)

Expand All @@ -143,7 +172,7 @@ def test_vsz_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
vsz_ps = ps('vsz', self.pid)
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)

Expand Down Expand Up @@ -196,7 +225,7 @@ def test_name_long_cmdline_nsp_exc(self):

@unittest.skipIf(MACOS or BSD, 'ps -o start not available')
def test_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
time_ps = ps('start', self.pid)
time_psutil = psutil.Process(self.pid).create_time()
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
Expand Down Expand Up @@ -235,7 +264,7 @@ def test_cmdline(self):
@unittest.skipIf(SUNOS, "not reliable on SUNOS")
@unittest.skipIf(AIX, "not reliable on AIX")
def test_nice(self):
ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid)
ps_nice = ps('nice', self.pid)
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)

Expand Down Expand Up @@ -289,30 +318,20 @@ class TestSystemAPIs(unittest.TestCase):
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
if SUNOS or AIX:
cmd = ["ps", "-A", "-o", "pid"]
else:
cmd = ["ps", "ax", "-o", "pid"]
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
assert p.poll() == 0
if PY3:
output = str(output, sys.stdout.encoding)
pids_ps = []
for line in output.split('\n')[1:]:
if line:
pid = int(line.split()[0].strip())
pids_ps.append(pid)
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_ps = ps("pid")
pids_psutil = psutil.pids()
pids_ps.sort()
pids_psutil.sort()

# on MACOS and OPENBSD ps doesn't show pid 0
if MACOS or OPENBSD and 0 not in pids_ps:
pids_ps.insert(0, 0)
self.assertEqual(pids_ps, pids_psutil)

# There will often be one more process in pids_ps for ps itself
if len(pids_ps) - len(pids_psutil) > 1:
difference = [x for x in pids_psutil if x not in pids_ps] + \
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))

# for some reason ifconfig -a does not report all interfaces
# returned by psutil
Expand Down