diff --git a/HISTORY.rst b/HISTORY.rst index b09a21057..a4de8c3b1 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -11,6 +11,8 @@ Bug tracker at https://github.com/giampaolo/psutil/issues - #753: [Linux, OSX, Windows] Process USS and PSS (Linux) "real" memory stats. (patch by Eric Rahm) - #755: Process.memory_percent() "memtype" parameter. +- #758: tests now live in psutil namespace. +- #760: expose OS constants (LINUX, OSX, etc.) **Bug fixes** diff --git a/MANIFEST.in b/MANIFEST.in index bddf43dd9..5b1d67c64 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -19,5 +19,4 @@ recursive-exclude docs/_build * recursive-include .ci * recursive-include docs * recursive-include examples *.py -recursive-include psutil *.py *.c *.h -recursive-include test *.py README* +recursive-include psutil *.py *.c *.h README* diff --git a/Makefile b/Makefile index 153a07733..35d592982 100644 --- a/Makefile +++ b/Makefile @@ -4,14 +4,14 @@ # You can set these variables from the command line. PYTHON = python -TSCRIPT = test/test_psutil.py +TSCRIPT = psutil/tests/runner.py all: test clean: rm -f `find . -type f -name \*.py[co]` rm -f `find . -type f -name \*.so` - rm -f `find . -type f -name .\*~` + rm -f `find . -type f -name \*.~` rm -f `find . -type f -name \*.orig` rm -f `find . -type f -name \*.bak` rm -f `find . -type f -name \*.rej` @@ -57,7 +57,7 @@ setup-dev-env: install-git-hooks unittest2 \ install: build - $(PYTHON) setup.py install --user + $(PYTHON) setup.py develop --user uninstall: cd ..; $(PYTHON) -m pip uninstall -y -v psutil diff --git a/make.bat b/make.bat index 35b9fd560..724eba407 100644 --- a/make.bat +++ b/make.bat @@ -23,7 +23,7 @@ if "%PYTHON%" == "" ( set PYTHON=C:\Python27\python.exe ) if "%TSCRIPT%" == "" ( - set TSCRIPT=test\test_psutil.py + set TSCRIPT=psutil\tests\runner.py ) set VSINSTALLDIR=%VS90COMNTOOLS%..\.. diff --git a/psutil/tests/README.rst b/psutil/tests/README.rst new file mode 100644 index 000000000..da498b85b --- /dev/null +++ b/psutil/tests/README.rst @@ -0,0 +1,13 @@ +- The recommended way to run tests (also on Windows) is to cd into psutil root + directory and run ``make test``. + +- Depending on the Python version dependencies for running tests include + ipaddress, mock and unittest2 modules (get them on PYPI. + On Windows also pywin32 and WMIC modules are recommended (although optional). + Run ``make setup-dev-env`` to install them (also on Windows). + +- To run tests on all supported Python version install tox (pip install tox) + then run ``tox``. + +- Every time a commit is pushed tests are automatically run on Travis: + https://travis-ci.org/giampaolo/psutil/ diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py new file mode 100644 index 000000000..71970babe --- /dev/null +++ b/psutil/tests/__init__.py @@ -0,0 +1,594 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +Test utilities. +""" + +import atexit +import contextlib +import errno +import functools +import os +import re +import shutil +import socket +import stat +import subprocess +import sys +import tempfile +import threading +import time +import warnings +from socket import AF_INET +from socket import SOCK_DGRAM +from socket import SOCK_STREAM +try: + import ipaddress # python >= 3.3 +except ImportError: + ipaddress = None +try: + from unittest import mock # py3 +except ImportError: + import mock # NOQA - requires "pip install mock" + +import psutil +from psutil import LINUX +from psutil import OSX +from psutil import POSIX +from psutil import WINDOWS +from psutil._compat import PY3 +from psutil._compat import unicode +from psutil._compat import which + +if sys.version_info < (2, 7): + import unittest2 as unittest # requires "pip install unittest2" +else: + import unittest +if sys.version_info >= (3, 4): + import enum +else: + enum = None + +if PY3: + import importlib + # python <=3.3 + if not hasattr(importlib, 'reload'): + import imp as importlib +else: + import imp as importlib + + +# =================================================================== +# --- Constants +# =================================================================== + +# conf for retry_before_failing() decorator +NO_RETRIES = 10 +# bytes tolerance for OS memory related tests +MEMORY_TOLERANCE = 500 * 1024 # 500KB +# the timeout used in functions which have to wait +GLOBAL_TIMEOUT = 3 + +AF_INET6 = getattr(socket, "AF_INET6") +AF_UNIX = getattr(socket, "AF_UNIX", None) +PYTHON = os.path.realpath(sys.executable) +DEVNULL = open(os.devnull, 'r+') +TESTFN = os.path.join(os.getcwd(), "$testfile") +TESTFN_UNICODE = TESTFN + "ƒőő" +TESTFILE_PREFIX = 'psutil-test-suite-' +if not PY3: + try: + TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding()) + except UnicodeDecodeError: + TESTFN_UNICODE = TESTFN + "???" + +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), + '..', '..')) +EXAMPLES_DIR = os.path.join(ROOT_DIR, 'examples') + +WIN_VISTA = (6, 0, 0) if WINDOWS else None +VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) + if x.startswith('STATUS_')] +# whether we're running this test suite on Travis (https://travis-ci.org/) +TRAVIS = bool(os.environ.get('TRAVIS')) +# whether we're running this test suite on Appveyor for Windows +# (http://www.appveyor.com/) +APPVEYOR = bool(os.environ.get('APPVEYOR')) + +if TRAVIS or 'tox' in sys.argv[0]: + import ipaddress +if TRAVIS or APPVEYOR: + GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4 +VERBOSITY = 1 if os.getenv('SILENT') else 2 + + +# =================================================================== +# --- Classes +# =================================================================== + +class ThreadTask(threading.Thread): + """A thread object used for running process thread tests.""" + + def __init__(self): + threading.Thread.__init__(self) + self._running = False + self._interval = None + self._flag = threading.Event() + + def __repr__(self): + name = self.__class__.__name__ + return '<%s running=%s at %#x>' % (name, self._running, id(self)) + + def start(self, interval=0.001): + """Start thread and keep it running until an explicit + stop() request. Polls for shutdown every 'timeout' seconds. + """ + if self._running: + raise ValueError("already started") + self._interval = interval + threading.Thread.start(self) + self._flag.wait() + + def run(self): + self._running = True + self._flag.set() + while self._running: + time.sleep(self._interval) + + def stop(self): + """Stop thread execution and and waits until it is stopped.""" + if not self._running: + raise ValueError("already stopped") + self._running = False + self.join() + + +# =================================================================== +# --- Utility functions +# =================================================================== + +def cleanup(): + reap_children(search_all=True) + safe_remove(TESTFN) + try: + safe_rmdir(TESTFN_UNICODE) + except UnicodeEncodeError: + pass + for path in _testfiles: + safe_remove(path) + +atexit.register(cleanup) +atexit.register(lambda: DEVNULL.close()) + + +_subprocesses_started = set() + + +def get_test_subprocess(cmd=None, wait=False, **kwds): + """Return a subprocess.Popen object to use in tests. + By default stdout and stderr are redirected to /dev/null and the + python interpreter is used as test process. + If 'wait' is True attemps to make sure the process is in a + reasonably initialized state. + """ + if cmd is None: + pyline = "" + if wait: + pyline += "open(r'%s', 'w'); " % TESTFN + # A process living for 30 secs. We sleep N times (as opposed to + # once) in order to be nicer towards Windows which doesn't handle + # interrupt signals properly. + pyline += "import time; [time.sleep(0.01) for x in range(3000)];" + cmd_ = [PYTHON, "-c", pyline] + else: + cmd_ = cmd + kwds.setdefault("stdin", DEVNULL) + kwds.setdefault("stdout", DEVNULL) + kwds.setdefault("stderr", DEVNULL) + sproc = subprocess.Popen(cmd_, **kwds) + if wait: + if cmd is None: + stop_at = time.time() + 3 + while stop_at > time.time(): + if os.path.exists(TESTFN): + break + time.sleep(0.001) + else: + warn("couldn't make sure test file was actually created") + else: + wait_for_pid(sproc.pid) + _subprocesses_started.add(psutil.Process(sproc.pid)) + return sproc + + +_testfiles = [] + + +def pyrun(src): + """Run python code 'src' in a separate interpreter. + Return interpreter subprocess. + """ + if PY3: + src = bytes(src, 'ascii') + with tempfile.NamedTemporaryFile( + prefix=TESTFILE_PREFIX, delete=False) as f: + _testfiles.append(f.name) + f.write(src) + f.flush() + subp = get_test_subprocess([PYTHON, f.name], stdout=None, + stderr=None) + wait_for_pid(subp.pid) + return subp + + +def warn(msg): + """Raise a warning msg.""" + warnings.warn(msg, UserWarning) + + +def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): + """run cmd in a subprocess and return its output. + raises RuntimeError on error. + """ + p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) + stdout, stderr = p.communicate() + if p.returncode != 0: + raise RuntimeError(stderr) + if stderr: + warn(stderr) + if PY3: + stdout = str(stdout, sys.stdout.encoding or + sys.getfilesystemencoding()) + return stdout.strip() + + +if POSIX: + def get_kernel_version(): + """Return a tuple such as (2, 6, 36).""" + s = "" + uname = os.uname()[2] + for c in uname: + if c.isdigit() or c == '.': + s += c + else: + break + if not s: + raise ValueError("can't parse %r" % uname) + minor = 0 + micro = 0 + nums = s.split('.') + major = int(nums[0]) + if len(nums) >= 2: + minor = int(nums[1]) + if len(nums) >= 3: + micro = int(nums[2]) + return (major, minor, micro) + + +if LINUX: + RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36) +else: + RLIMIT_SUPPORT = False + + +def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT): + """Wait for pid to show up in the process list then return. + Used in the test suite to give time the sub process to initialize. + """ + raise_at = time.time() + timeout + while True: + if pid in psutil.pids(): + # give it one more iteration to allow full initialization + time.sleep(0.01) + return + time.sleep(0.0001) + if time.time() >= raise_at: + raise RuntimeError("Timed out") + + +def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True): + """Wait for a file to be written on disk.""" + stop_at = time.time() + 3 + while time.time() < stop_at: + try: + with open(fname, "r") as f: + data = f.read() + if not data: + continue + if delete_file: + os.remove(fname) + return data + except IOError: + time.sleep(0.001) + raise RuntimeError("timed out (couldn't read file)") + + +def reap_children(search_all=False): + """Kill any subprocess started by this test suite and ensure that + no zombies stick around to hog resources and create problems when + looking for refleaks. + """ + global _subprocesses_started + procs = _subprocesses_started.copy() + if search_all: + this_process = psutil.Process() + for p in this_process.children(recursive=True): + procs.add(p) + for p in procs: + try: + p.terminate() + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT) + for p in alive: + warn("couldn't terminate process %s" % p) + try: + p.kill() + except psutil.NoSuchProcess: + pass + _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) + if alive: + warn("couldn't not kill processes %s" % str(alive)) + _subprocesses_started = set(alive) + + +def check_ip_address(addr, family): + """Attempts to check IP address's validity.""" + if enum and PY3: + assert isinstance(family, enum.IntEnum), family + if family == AF_INET: + octs = [int(x) for x in addr.split('.')] + assert len(octs) == 4, addr + for num in octs: + assert 0 <= num <= 255, addr + if ipaddress: + if not PY3: + addr = unicode(addr) + ipaddress.IPv4Address(addr) + elif family == AF_INET6: + assert isinstance(addr, str), addr + if ipaddress: + if not PY3: + addr = unicode(addr) + ipaddress.IPv6Address(addr) + elif family == psutil.AF_LINK: + assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr + else: + raise ValueError("unknown family %r", family) + + +def check_connection_ntuple(conn): + """Check validity of a connection namedtuple.""" + valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if + x.startswith('CONN_')] + assert conn[0] == conn.fd + assert conn[1] == conn.family + assert conn[2] == conn.type + assert conn[3] == conn.laddr + assert conn[4] == conn.raddr + assert conn[5] == conn.status + assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type) + assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family) + assert conn.status in valid_conn_states, conn.status + + # check IP address and port sanity + for addr in (conn.laddr, conn.raddr): + if not addr: + continue + if conn.family in (AF_INET, AF_INET6): + assert isinstance(addr, tuple), addr + ip, port = addr + assert isinstance(port, int), port + assert 0 <= port <= 65535, port + check_ip_address(ip, conn.family) + elif conn.family == AF_UNIX: + assert isinstance(addr, (str, None)), addr + else: + raise ValueError("unknown family %r", conn.family) + + if conn.family in (AF_INET, AF_INET6): + # actually try to bind the local socket; ignore IPv6 + # sockets as their address might be represented as + # an IPv4-mapped-address (e.g. "::127.0.0.1") + # and that's rejected by bind() + if conn.family == AF_INET: + s = socket.socket(conn.family, conn.type) + with contextlib.closing(s): + try: + s.bind((conn.laddr[0], 0)) + except socket.error as err: + if err.errno != errno.EADDRNOTAVAIL: + raise + elif conn.family == AF_UNIX: + assert not conn.raddr, repr(conn.raddr) + assert conn.status == psutil.CONN_NONE, conn.status + + if getattr(conn, 'fd', -1) != -1: + assert conn.fd > 0, conn + if hasattr(socket, 'fromfd') and not WINDOWS: + try: + dupsock = socket.fromfd(conn.fd, conn.family, conn.type) + except (socket.error, OSError) as err: + if err.args[0] != errno.EBADF: + raise + else: + with contextlib.closing(dupsock): + assert dupsock.family == conn.family + assert dupsock.type == conn.type + + +def safe_remove(file): + "Convenience function for removing temporary test files" + try: + os.remove(file) + except OSError as err: + if err.errno != errno.ENOENT: + # # file is being used by another process + # if WINDOWS and isinstance(err, WindowsError) and err.errno == 13: + # return + raise + + +def safe_rmdir(dir): + "Convenience function for removing temporary test directories" + try: + os.rmdir(dir) + except OSError as err: + if err.errno != errno.ENOENT: + raise + + +@contextlib.contextmanager +def chdir(dirname): + """Context manager which temporarily changes the current directory.""" + curdir = os.getcwd() + try: + os.chdir(dirname) + yield + finally: + os.chdir(curdir) + + +def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): + """Keep calling function for timeout secs and exit if eval() + expression is True. + """ + stop_at = time.time() + timeout + while time.time() < stop_at: + ret = fun() + if eval(expr): + return ret + time.sleep(0.001) + raise RuntimeError('timed out (ret=%r)' % ret) + + +def retry_before_failing(ntimes=None): + """Decorator which runs a test function and retries N times before + actually failing. + """ + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + times = ntimes or NO_RETRIES + assert times, times + exc = None + for x in range(times): + try: + return fun(*args, **kwargs) + except AssertionError as _: + exc = _ + raise exc + return wrapper + return decorator + + +def skip_on_access_denied(only_if=None): + """Decorator to Ignore AccessDenied exceptions.""" + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + try: + return fun(*args, **kwargs) + except psutil.AccessDenied: + if only_if is not None: + if not only_if: + raise + msg = "%r was skipped because it raised AccessDenied" \ + % fun.__name__ + raise unittest.SkipTest(msg) + return wrapper + return decorator + + +def skip_on_not_implemented(only_if=None): + """Decorator to Ignore NotImplementedError exceptions.""" + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + try: + return fun(*args, **kwargs) + except NotImplementedError: + if only_if is not None: + if not only_if: + raise + msg = "%r was skipped because it raised NotImplementedError" \ + % fun.__name__ + raise unittest.SkipTest(msg) + return wrapper + return decorator + + +def create_temp_executable_file(suffix, code="void main() { pause(); }"): + tmpdir = None + if TRAVIS and OSX: + tmpdir = "/private/tmp" + fd, path = tempfile.mkstemp( + prefix='psu', suffix=suffix, dir=tmpdir) + os.close(fd) + + if which("gcc"): + fd, c_file = tempfile.mkstemp( + prefix='psu', suffix='.c', dir=tmpdir) + os.close(fd) + with open(c_file, "w") as f: + f.write(code) + subprocess.check_call(["gcc", c_file, "-o", path]) + safe_remove(c_file) + else: + # fallback - use python's executable + shutil.copyfile(sys.executable, path) + if POSIX: + st = os.stat(path) + os.chmod(path, st.st_mode | stat.S_IEXEC) + return path + + +if WINDOWS: + def get_winver(): + wv = sys.getwindowsversion() + if hasattr(wv, 'service_pack_major'): # python >= 2.7 + sp = wv.service_pack_major or 0 + else: + r = re.search("\s\d$", wv[4]) + if r: + sp = int(r.group(0)) + else: + sp = 0 + return (wv[0], wv[1], sp) +else: + def get_winver(): + raise NotImplementedError("not a Windows OS") + + +# In Python 3 paths are unicode objects by default. Surrogate escapes +# are used to handle non-character data. +def encode_path(path): + if PY3: + return path.encode(sys.getfilesystemencoding(), + errors="surrogateescape") + else: + return path + + +def decode_path(path): + if PY3: + return path.decode(sys.getfilesystemencoding(), + errors="surrogateescape") + else: + return path + + +def test_module_by_name(name): + # testmodules = [os.path.splitext(x)[0] for x in os.listdir(HERE) + # if x.endswith('.py') and x.startswith('test_')] + name = os.path.splitext(os.path.basename(name))[0] + suite = unittest.TestSuite() + suite.addTest(unittest.defaultTestLoader.loadTestsFromName(name)) + result = unittest.TextTestRunner(verbosity=VERBOSITY).run(suite) + success = result.wasSuccessful() + sys.exit(0 if success else 1) diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py new file mode 100644 index 000000000..7b14212c5 --- /dev/null +++ b/psutil/tests/runner.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +# Copyright (C) 2007-2016 Giampaolo Rodola' . +# Use of this source code is governed by MIT license that can be +# found in the LICENSE file. + +"""Script for running all test files (except memory leaks tests).""" + +import os +import sys + +from pyftpdlib.test import unittest +from pyftpdlib.test import VERBOSITY + + +HERE = os.path.abspath(os.path.dirname(__file__)) +testmodules = [os.path.splitext(x)[0] for x in os.listdir(HERE) + if x.endswith('.py') and x.startswith('test_') and not + x.startswith('test_memory_leaks')] +suite = unittest.TestSuite() +for tm in testmodules: + suite.addTest(unittest.defaultTestLoader.loadTestsFromName(tm)) +result = unittest.TextTestRunner(verbosity=VERBOSITY).run(suite) +success = result.wasSuccessful() +sys.exit(0 if success else 1) diff --git a/test/_bsd.py b/psutil/tests/test_bsd.py similarity index 92% rename from test/_bsd.py rename to psutil/tests/test_bsd.py index 3a2458d66..5d93f314a 100644 --- a/test/_bsd.py +++ b/psutil/tests/test_bsd.py @@ -6,9 +6,9 @@ # TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd. -"""Tests specific to all BSD platforms. These are implicitly run by -test_psutil. -py.""" + +"""Tests specific to all BSD platforms.""" + import datetime import os @@ -22,13 +22,14 @@ from psutil import NETBSD from psutil import OPENBSD from psutil._compat import PY3 -from test_psutil import get_test_subprocess -from test_psutil import MEMORY_TOLERANCE -from test_psutil import reap_children -from test_psutil import retry_before_failing -from test_psutil import sh -from test_psutil import unittest -from test_psutil import which +from psutil.tests import get_test_subprocess +from psutil.tests import MEMORY_TOLERANCE +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import sh +from psutil.tests import test_module_by_name +from psutil.tests import unittest +from psutil.tests import which PAGESIZE = os.sysconf("SC_PAGE_SIZE") @@ -293,16 +294,5 @@ def test_boot_time(self): self.assertEqual(sys_bt, psutil_bt) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) - if FREEBSD: - test_suite.addTest(unittest.makeSuite(FreeBSDSpecificTestCase)) - elif OPENBSD: - test_suite.addTest(unittest.makeSuite(OpenBSDSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/test/_linux.py b/psutil/tests/test_linux.py similarity index 96% rename from test/_linux.py rename to psutil/tests/test_linux.py index 45d61a5ff..eb5ab9b4a 100644 --- a/test/_linux.py +++ b/psutil/tests/test_linux.py @@ -4,9 +4,7 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""Linux specific tests. These are implicitly run by test_psutil.py.""" - -from __future__ import division +"""Linux specific tests.""" import contextlib import errno @@ -18,7 +16,6 @@ import shutil import socket import struct -import sys import tempfile import textwrap import time @@ -34,19 +31,21 @@ from psutil import LINUX from psutil._compat import PY3 from psutil._compat import u -from test_psutil import call_until -from test_psutil import get_kernel_version -from test_psutil import importlib -from test_psutil import MEMORY_TOLERANCE -from test_psutil import pyrun -from test_psutil import reap_children -from test_psutil import retry_before_failing -from test_psutil import sh -from test_psutil import skip_on_not_implemented -from test_psutil import TESTFN -from test_psutil import TRAVIS -from test_psutil import unittest -from test_psutil import which +from psutil.tests import call_until +from psutil.tests import get_kernel_version +from psutil.tests import importlib +from psutil.tests import MEMORY_TOLERANCE +from psutil.tests import pyrun +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import sh +from psutil.tests import skip_on_not_implemented +from psutil.tests import TESTFN +from psutil.tests import TRAVIS +from psutil.tests import unittest +from psutil.tests import which +from psutil.tests import test_module_by_name + HERE = os.path.abspath(os.path.dirname(__file__)) # procps-ng 3.3.10 changed the output format of free @@ -620,12 +619,5 @@ def test_memory_addrspace_info(self): mem.swap, sum([x.swap for x in maps])) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/test/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py similarity index 95% rename from test/test_memory_leaks.py rename to psutil/tests/test_memory_leaks.py index 200783731..dce70eab8 100644 --- a/test/test_memory_leaks.py +++ b/psutil/tests/test_memory_leaks.py @@ -18,29 +18,26 @@ import threading import time + import psutil import psutil._common +from psutil import FREEBSD +from psutil import LINUX +from psutil import OPENBSD +from psutil import OSX +from psutil import POSIX +from psutil import SUNOS +from psutil import WINDOWS +from psutil._common import supports_ipv6 from psutil._compat import callable from psutil._compat import xrange -from test_psutil import FREEBSD -from test_psutil import get_test_subprocess -from test_psutil import LINUX -from test_psutil import OPENBSD -from test_psutil import OSX -from test_psutil import POSIX -from test_psutil import reap_children -from test_psutil import RLIMIT_SUPPORT -from test_psutil import safe_remove -from test_psutil import SUNOS -from test_psutil import supports_ipv6 -from test_psutil import TESTFN -from test_psutil import TRAVIS -from test_psutil import WINDOWS - -if sys.version_info < (2, 7): - import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 -else: - import unittest +from psutil.tests import get_test_subprocess +from psutil.tests import reap_children +from psutil.tests import RLIMIT_SUPPORT +from psutil.tests import safe_remove +from psutil.tests import TESTFN +from psutil.tests import TRAVIS +from psutil.tests import unittest LOOPS = 1000 diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py new file mode 100644 index 000000000..f53715f32 --- /dev/null +++ b/psutil/tests/test_misc.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import ast +import errno +import imp +import json +import os +import pickle +import psutil +import socket +import stat +import sys + +from psutil import LINUX +from psutil import NETBSD +from psutil import OPENBSD +from psutil import POSIX +from psutil import WINDOWS +from psutil._common import supports_ipv6 +from psutil.tests import APPVEYOR +from psutil.tests import EXAMPLES_DIR +from psutil.tests import importlib +from psutil.tests import mock +from psutil.tests import ROOT_DIR +from psutil.tests import sh +from psutil.tests import test_module_by_name +from psutil.tests import TRAVIS +from psutil.tests import unittest + + +# =================================================================== +# --- Misc tests +# =================================================================== + +class TestMisc(unittest.TestCase): + """Misc / generic tests.""" + + def test_process__repr__(self, func=repr): + p = psutil.Process() + r = func(p) + self.assertIn("psutil.Process", r) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("name=", r) + self.assertIn(p.name(), r) + with mock.patch.object(psutil.Process, "name", + side_effect=psutil.ZombieProcess(os.getpid())): + p = psutil.Process() + r = func(p) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("zombie", r) + self.assertNotIn("name=", r) + with mock.patch.object(psutil.Process, "name", + side_effect=psutil.NoSuchProcess(os.getpid())): + p = psutil.Process() + r = func(p) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("terminated", r) + self.assertNotIn("name=", r) + with mock.patch.object(psutil.Process, "name", + side_effect=psutil.AccessDenied(os.getpid())): + p = psutil.Process() + r = func(p) + self.assertIn("pid=%s" % p.pid, r) + self.assertNotIn("name=", r) + + def test_process__str__(self): + self.test_process__repr__(func=str) + + def test_no_such_process__repr__(self, func=repr): + self.assertEqual( + repr(psutil.NoSuchProcess(321)), + "psutil.NoSuchProcess process no longer exists (pid=321)") + self.assertEqual( + repr(psutil.NoSuchProcess(321, name='foo')), + "psutil.NoSuchProcess process no longer exists (pid=321, " + "name='foo')") + self.assertEqual( + repr(psutil.NoSuchProcess(321, msg='foo')), + "psutil.NoSuchProcess foo") + + def test_zombie_process__repr__(self, func=repr): + self.assertEqual( + repr(psutil.ZombieProcess(321)), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321)") + self.assertEqual( + repr(psutil.ZombieProcess(321, name='foo')), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321, name='foo')") + self.assertEqual( + repr(psutil.ZombieProcess(321, name='foo', ppid=1)), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321, name='foo', ppid=1)") + self.assertEqual( + repr(psutil.ZombieProcess(321, msg='foo')), + "psutil.ZombieProcess foo") + + def test_access_denied__repr__(self, func=repr): + self.assertEqual( + repr(psutil.AccessDenied(321)), + "psutil.AccessDenied (pid=321)") + self.assertEqual( + repr(psutil.AccessDenied(321, name='foo')), + "psutil.AccessDenied (pid=321, name='foo')") + self.assertEqual( + repr(psutil.AccessDenied(321, msg='foo')), + "psutil.AccessDenied foo") + + def test_timeout_expired__repr__(self, func=repr): + self.assertEqual( + repr(psutil.TimeoutExpired(321)), + "psutil.TimeoutExpired timeout after 321 seconds") + self.assertEqual( + repr(psutil.TimeoutExpired(321, pid=111)), + "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") + self.assertEqual( + repr(psutil.TimeoutExpired(321, pid=111, name='foo')), + "psutil.TimeoutExpired timeout after 321 seconds " + "(pid=111, name='foo')") + + def test_process__eq__(self): + p1 = psutil.Process() + p2 = psutil.Process() + self.assertEqual(p1, p2) + p2._ident = (0, 0) + self.assertNotEqual(p1, p2) + self.assertNotEqual(p1, 'foo') + + def test_process__hash__(self): + s = set([psutil.Process(), psutil.Process()]) + self.assertEqual(len(s), 1) + + def test__all__(self): + dir_psutil = dir(psutil) + for name in dir_psutil: + if name in ('callable', 'error', 'namedtuple', + 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', + 'TOTAL_PHYMEM'): + continue + if not name.startswith('_'): + try: + __import__(name) + except ImportError: + if name not in psutil.__all__: + fun = getattr(psutil, name) + if fun is None: + continue + if (fun.__doc__ is not None and + 'deprecated' not in fun.__doc__.lower()): + self.fail('%r not in psutil.__all__' % name) + + # Import 'star' will break if __all__ is inconsistent, see: + # https://github.com/giampaolo/psutil/issues/656 + # Can't do `from psutil import *` as it won't work on python 3 + # so we simply iterate over __all__. + for name in psutil.__all__: + self.assertIn(name, dir_psutil) + + def test_version(self): + self.assertEqual('.'.join([str(x) for x in psutil.version_info]), + psutil.__version__) + + def test_memoize(self): + from psutil._common import memoize + + @memoize + def foo(*args, **kwargs): + "foo docstring" + calls.append(None) + return (args, kwargs) + + calls = [] + # no args + for x in range(2): + ret = foo() + expected = ((), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 1) + # with args + for x in range(2): + ret = foo(1) + expected = ((1, ), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 2) + # with args + kwargs + for x in range(2): + ret = foo(1, bar=2) + expected = ((1, ), {'bar': 2}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 3) + # clear cache + foo.cache_clear() + ret = foo() + expected = ((), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 4) + # docstring + self.assertEqual(foo.__doc__, "foo docstring") + + def test_parse_environ_block(self): + from psutil._common import parse_environ_block + + def k(s): + return s.upper() if WINDOWS else s + + self.assertEqual(parse_environ_block("a=1\0"), + {k("a"): "1"}) + self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), + {k("a"): "1", k("b"): "2"}) + self.assertEqual(parse_environ_block("a=1\0b=\0\0"), + {k("a"): "1", k("b"): ""}) + # ignore everything after \0\0 + self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), + {k("a"): "1", k("b"): "2"}) + # ignore everything that is not an assignment + self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) + self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) + # do not fail if the block is incomplete + self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) + + def test_supports_ipv6(self): + if supports_ipv6(): + with mock.patch('psutil._common.socket') as s: + s.has_ipv6 = False + assert not supports_ipv6() + with mock.patch('psutil._common.socket.socket', + side_effect=socket.error) as s: + assert not supports_ipv6() + assert s.called + with mock.patch('psutil._common.socket.socket', + side_effect=socket.gaierror) as s: + assert not supports_ipv6() + assert s.called + with mock.patch('psutil._common.socket.socket.bind', + side_effect=socket.gaierror) as s: + assert not supports_ipv6() + assert s.called + else: + if hasattr(socket, 'AF_INET6'): + with self.assertRaises(Exception): + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind(("::1", 0)) + + def test_isfile_strict(self): + from psutil._common import isfile_strict + this_file = os.path.abspath(__file__) + assert isfile_strict(this_file) + assert not isfile_strict(os.path.dirname(this_file)) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EPERM, "foo")): + self.assertRaises(OSError, isfile_strict, this_file) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EACCES, "foo")): + self.assertRaises(OSError, isfile_strict, this_file) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EINVAL, "foo")): + assert not isfile_strict(this_file) + with mock.patch('psutil._common.stat.S_ISREG', return_value=False): + assert not isfile_strict(this_file) + + def test_serialization(self): + def check(ret): + if json is not None: + json.loads(json.dumps(ret)) + a = pickle.dumps(ret) + b = pickle.loads(a) + self.assertEqual(ret, b) + + check(psutil.Process().as_dict()) + check(psutil.virtual_memory()) + check(psutil.swap_memory()) + check(psutil.cpu_times()) + check(psutil.cpu_times_percent(interval=0)) + check(psutil.net_io_counters()) + if LINUX and not os.path.exists('/proc/diskstats'): + pass + else: + if not APPVEYOR: + check(psutil.disk_io_counters()) + check(psutil.disk_partitions()) + check(psutil.disk_usage(os.getcwd())) + check(psutil.users()) + + def test_setup_script(self): + setup_py = os.path.join(ROOT_DIR, 'setup.py') + module = imp.load_source('setup', setup_py) + self.assertRaises(SystemExit, module.setup) + self.assertEqual(module.get_version(), psutil.__version__) + + def test_ad_on_process_creation(self): + # We are supposed to be able to instantiate Process also in case + # of zombie processes or access denied. + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.AccessDenied) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.ZombieProcess(1)) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=ValueError) as meth: + with self.assertRaises(ValueError): + psutil.Process() + assert meth.called + + def test_psutil_is_reloadable(self): + importlib.reload(psutil) + + +# =================================================================== +# --- Example script tests +# =================================================================== + +class TestExampleScripts(unittest.TestCase): + """Tests for scripts in the examples directory.""" + + def assert_stdout(self, exe, args=None): + exe = os.path.join(EXAMPLES_DIR, exe) + if args: + exe = exe + ' ' + args + try: + out = sh(sys.executable + ' ' + exe).strip() + except RuntimeError as err: + if 'AccessDenied' in str(err): + return str(err) + else: + raise + assert out, out + return out + + def assert_syntax(self, exe, args=None): + exe = os.path.join(EXAMPLES_DIR, exe) + with open(exe, 'r') as f: + src = f.read() + ast.parse(src) + + def test_check_presence(self): + # make sure all example scripts have a test method defined + meths = dir(self) + for name in os.listdir(EXAMPLES_DIR): + if name.endswith('.py'): + if 'test_' + os.path.splitext(name)[0] not in meths: + # self.assert_stdout(name) + self.fail('no test defined for %r script' + % os.path.join(EXAMPLES_DIR, name)) + + @unittest.skipUnless(POSIX, "UNIX only") + def test_executable(self): + for name in os.listdir(EXAMPLES_DIR): + if name.endswith('.py'): + path = os.path.join(EXAMPLES_DIR, name) + if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: + self.fail('%r is not executable' % path) + + def test_disk_usage(self): + self.assert_stdout('disk_usage.py') + + def test_free(self): + self.assert_stdout('free.py') + + def test_meminfo(self): + self.assert_stdout('meminfo.py') + + def test_process_detail(self): + self.assert_stdout('process_detail.py') + + @unittest.skipIf(APPVEYOR, "can't find users on Appveyor") + def test_who(self): + self.assert_stdout('who.py') + + def test_ps(self): + self.assert_stdout('ps.py') + + def test_pstree(self): + self.assert_stdout('pstree.py') + + def test_netstat(self): + self.assert_stdout('netstat.py') + + @unittest.skipIf(TRAVIS, "permission denied on travis") + def test_ifconfig(self): + self.assert_stdout('ifconfig.py') + + @unittest.skipIf(OPENBSD or NETBSD, "memory maps not supported") + def test_pmap(self): + self.assert_stdout('pmap.py', args=str(os.getpid())) + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_killall(self): + self.assert_syntax('killall.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_nettop(self): + self.assert_syntax('nettop.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_top(self): + self.assert_syntax('top.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_iotop(self): + self.assert_syntax('iotop.py') + + def test_pidof(self): + output = self.assert_stdout('pidof.py %s' % psutil.Process().name()) + self.assertIn(str(os.getpid()), output) + + +if __name__ == '__main__': + test_module_by_name(__file__) diff --git a/test/_osx.py b/psutil/tests/test_osx.py similarity index 90% rename from test/_osx.py rename to psutil/tests/test_osx.py index feed396d6..192f960f9 100644 --- a/test/_osx.py +++ b/psutil/tests/test_osx.py @@ -4,7 +4,7 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""OSX specific tests. These are implicitly run by test_psutil.py.""" +"""OSX specific tests.""" import os import re @@ -15,13 +15,14 @@ import psutil from psutil import OSX from psutil._compat import PY3 -from test_psutil import get_test_subprocess -from test_psutil import MEMORY_TOLERANCE -from test_psutil import reap_children -from test_psutil import retry_before_failing -from test_psutil import sh -from test_psutil import TRAVIS -from test_psutil import unittest +from psutil.tests import get_test_subprocess +from psutil.tests import MEMORY_TOLERANCE +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import sh +from psutil.tests import test_module_by_name +from psutil.tests import TRAVIS +from psutil.tests import unittest PAGESIZE = os.sysconf("SC_PAGE_SIZE") @@ -188,13 +189,5 @@ def test_swapmem_total(self): self.assertEqual(psutil_smem.free, human2bytes(free)) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/test/_posix.py b/psutil/tests/test_posix.py similarity index 93% rename from test/_posix.py rename to psutil/tests/test_posix.py index 644e44670..a30a25109 100644 --- a/test/_posix.py +++ b/psutil/tests/test_posix.py @@ -4,7 +4,7 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""POSIX specific tests. These are implicitly run by test_psutil.py.""" +"""POSIX specific tests.""" import datetime import os @@ -20,16 +20,17 @@ from psutil import SUNOS from psutil._compat import callable from psutil._compat import PY3 -from test_psutil import get_kernel_version -from test_psutil import get_test_subprocess -from test_psutil import PYTHON -from test_psutil import reap_children -from test_psutil import retry_before_failing -from test_psutil import sh -from test_psutil import skip_on_access_denied -from test_psutil import TRAVIS -from test_psutil import unittest -from test_psutil import wait_for_pid +from psutil.tests import get_kernel_version +from psutil.tests import get_test_subprocess +from psutil.tests import PYTHON +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import sh +from psutil.tests import skip_on_access_denied +from psutil.tests import test_module_by_name +from psutil.tests import TRAVIS +from psutil.tests import unittest +from psutil.tests import wait_for_pid def ps(cmd): @@ -270,12 +271,5 @@ def test_cwd_proc(self): psutil.Process().cwd()) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/test/test_psutil.py b/psutil/tests/test_process.py similarity index 56% rename from test/test_psutil.py rename to psutil/tests/test_process.py index b36d4bb7a..c0d51e795 100644 --- a/test/test_psutil.py +++ b/psutil/tests/test_process.py @@ -5,29 +5,12 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -""" -psutil test suite. Run it with: -$ make test +"""Tests for psutil.Process class.""" -If you're on Python < 2.7 unittest2 module must be installed first: -https://pypi.python.org/pypi/unittest2 -""" - -from __future__ import division - -import ast -import atexit import collections import contextlib -import datetime import errno -import functools -import json -import imp import os -import pickle -import pprint -import re import select import shutil import signal @@ -37,11 +20,9 @@ import sys import tempfile import textwrap -import threading import time import traceback import types -import warnings from socket import AF_INET from socket import SOCK_DGRAM from socket import SOCK_STREAM @@ -55,6 +36,7 @@ import mock # requires "pip install mock" import psutil + from psutil import BSD from psutil import FREEBSD from psutil import LINUX @@ -69,1136 +51,40 @@ from psutil._compat import long from psutil._compat import PY3 from psutil._compat import unicode -from psutil._compat import which - -if sys.version_info < (2, 7): - import unittest2 as unittest # requires "pip install unittest2" -else: - import unittest -if sys.version_info >= (3, 4): - import enum -else: - enum = None - -if PY3: - import importlib - # python <=3.3 - if not hasattr(importlib, 'reload'): - import imp as importlib -else: - import imp as importlib - - -# =================================================================== -# --- Constants -# =================================================================== - -# conf for retry_before_failing() decorator -NO_RETRIES = 10 -# bytes tolerance for OS memory related tests -MEMORY_TOLERANCE = 500 * 1024 # 500KB -# the timeout used in functions which have to wait -GLOBAL_TIMEOUT = 3 - -AF_INET6 = getattr(socket, "AF_INET6") -AF_UNIX = getattr(socket, "AF_UNIX", None) -PYTHON = os.path.realpath(sys.executable) -DEVNULL = open(os.devnull, 'r+') -TESTFN = os.path.join(os.getcwd(), "$testfile") -TESTFN_UNICODE = TESTFN + "ƒőő" -TESTFILE_PREFIX = 'psutil-test-suite-' -if not PY3: - try: - TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding()) - except UnicodeDecodeError: - TESTFN_UNICODE = TESTFN + "???" - -EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), - '..', 'examples')) - -if WINDOWS: - WIN_VISTA = (6, 0, 0) -VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) - if x.startswith('STATUS_')] -# whether we're running this test suite on Travis (https://travis-ci.org/) -TRAVIS = bool(os.environ.get('TRAVIS')) -# whether we're running this test suite on Appveyor for Windows -# (http://www.appveyor.com/) -APPVEYOR = bool(os.environ.get('APPVEYOR')) - -if TRAVIS or 'tox' in sys.argv[0]: - import ipaddress -if TRAVIS or APPVEYOR: - GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4 - - -# =================================================================== -# --- Utility functions -# =================================================================== - -def cleanup(): - reap_children(search_all=True) - safe_remove(TESTFN) - try: - safe_rmdir(TESTFN_UNICODE) - except UnicodeEncodeError: - pass - for path in _testfiles: - safe_remove(path) - -atexit.register(cleanup) -atexit.register(lambda: DEVNULL.close()) - - -_subprocesses_started = set() - - -def get_test_subprocess(cmd=None, wait=False, **kwds): - """Return a subprocess.Popen object to use in tests. - By default stdout and stderr are redirected to /dev/null and the - python interpreter is used as test process. - If 'wait' is True attemps to make sure the process is in a - reasonably initialized state. - """ - if cmd is None: - pyline = "" - if wait: - pyline += "open(r'%s', 'w'); " % TESTFN - # A process living for 30 secs. We sleep N times (as opposed to - # once) in order to be nicer towards Windows which doesn't handle - # interrupt signals properly. - pyline += "import time; [time.sleep(0.01) for x in range(3000)];" - cmd_ = [PYTHON, "-c", pyline] - else: - cmd_ = cmd - kwds.setdefault("stdin", DEVNULL) - kwds.setdefault("stdout", DEVNULL) - kwds.setdefault("stderr", DEVNULL) - sproc = subprocess.Popen(cmd_, **kwds) - if wait: - if cmd is None: - stop_at = time.time() + 3 - while stop_at > time.time(): - if os.path.exists(TESTFN): - break - time.sleep(0.001) - else: - warn("couldn't make sure test file was actually created") - else: - wait_for_pid(sproc.pid) - _subprocesses_started.add(psutil.Process(sproc.pid)) - return sproc - - -_testfiles = [] - - -def pyrun(src): - """Run python code 'src' in a separate interpreter. - Return interpreter subprocess. - """ - if PY3: - src = bytes(src, 'ascii') - with tempfile.NamedTemporaryFile( - prefix=TESTFILE_PREFIX, delete=False) as f: - _testfiles.append(f.name) - f.write(src) - f.flush() - subp = get_test_subprocess([PYTHON, f.name], stdout=None, - stderr=None) - wait_for_pid(subp.pid) - return subp - - -def warn(msg): - """Raise a warning msg.""" - warnings.warn(msg, UserWarning) - - -def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): - """run cmd in a subprocess and return its output. - raises RuntimeError on error. - """ - p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise RuntimeError(stderr) - if stderr: - warn(stderr) - if PY3: - stdout = str(stdout, sys.stdout.encoding or - sys.getfilesystemencoding()) - return stdout.strip() - - -if POSIX: - def get_kernel_version(): - """Return a tuple such as (2, 6, 36).""" - s = "" - uname = os.uname()[2] - for c in uname: - if c.isdigit() or c == '.': - s += c - else: - break - if not s: - raise ValueError("can't parse %r" % uname) - minor = 0 - micro = 0 - nums = s.split('.') - major = int(nums[0]) - if len(nums) >= 2: - minor = int(nums[1]) - if len(nums) >= 3: - micro = int(nums[2]) - return (major, minor, micro) - - -if LINUX: - RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36) -else: - RLIMIT_SUPPORT = False - - -def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT): - """Wait for pid to show up in the process list then return. - Used in the test suite to give time the sub process to initialize. - """ - raise_at = time.time() + timeout - while True: - if pid in psutil.pids(): - # give it one more iteration to allow full initialization - time.sleep(0.01) - return - time.sleep(0.0001) - if time.time() >= raise_at: - raise RuntimeError("Timed out") - - -def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True): - """Wait for a file to be written on disk.""" - stop_at = time.time() + 3 - while time.time() < stop_at: - try: - with open(fname, "r") as f: - data = f.read() - if not data: - continue - if delete_file: - os.remove(fname) - return data - except IOError: - time.sleep(0.001) - raise RuntimeError("timed out (couldn't read file)") - - -def reap_children(search_all=False): - """Kill any subprocess started by this test suite and ensure that - no zombies stick around to hog resources and create problems when - looking for refleaks. - """ - global _subprocesses_started - procs = _subprocesses_started.copy() - if search_all: - this_process = psutil.Process() - for p in this_process.children(recursive=True): - procs.add(p) - for p in procs: - try: - p.terminate() - except psutil.NoSuchProcess: - pass - gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT) - for p in alive: - warn("couldn't terminate process %s" % p) - try: - p.kill() - except psutil.NoSuchProcess: - pass - _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) - if alive: - warn("couldn't not kill processes %s" % str(alive)) - _subprocesses_started = set(alive) - - -def check_ip_address(addr, family): - """Attempts to check IP address's validity.""" - if enum and PY3: - assert isinstance(family, enum.IntEnum), family - if family == AF_INET: - octs = [int(x) for x in addr.split('.')] - assert len(octs) == 4, addr - for num in octs: - assert 0 <= num <= 255, addr - if ipaddress: - if not PY3: - addr = unicode(addr) - ipaddress.IPv4Address(addr) - elif family == AF_INET6: - assert isinstance(addr, str), addr - if ipaddress: - if not PY3: - addr = unicode(addr) - ipaddress.IPv6Address(addr) - elif family == psutil.AF_LINK: - assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr - else: - raise ValueError("unknown family %r", family) - - -def check_connection_ntuple(conn): - """Check validity of a connection namedtuple.""" - valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if - x.startswith('CONN_')] - assert conn[0] == conn.fd - assert conn[1] == conn.family - assert conn[2] == conn.type - assert conn[3] == conn.laddr - assert conn[4] == conn.raddr - assert conn[5] == conn.status - assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type) - assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family) - assert conn.status in valid_conn_states, conn.status - - # check IP address and port sanity - for addr in (conn.laddr, conn.raddr): - if not addr: - continue - if conn.family in (AF_INET, AF_INET6): - assert isinstance(addr, tuple), addr - ip, port = addr - assert isinstance(port, int), port - assert 0 <= port <= 65535, port - check_ip_address(ip, conn.family) - elif conn.family == AF_UNIX: - assert isinstance(addr, (str, None)), addr - else: - raise ValueError("unknown family %r", conn.family) - - if conn.family in (AF_INET, AF_INET6): - # actually try to bind the local socket; ignore IPv6 - # sockets as their address might be represented as - # an IPv4-mapped-address (e.g. "::127.0.0.1") - # and that's rejected by bind() - if conn.family == AF_INET: - s = socket.socket(conn.family, conn.type) - with contextlib.closing(s): - try: - s.bind((conn.laddr[0], 0)) - except socket.error as err: - if err.errno != errno.EADDRNOTAVAIL: - raise - elif conn.family == AF_UNIX: - assert not conn.raddr, repr(conn.raddr) - assert conn.status == psutil.CONN_NONE, conn.status - - if getattr(conn, 'fd', -1) != -1: - assert conn.fd > 0, conn - if hasattr(socket, 'fromfd') and not WINDOWS: - try: - dupsock = socket.fromfd(conn.fd, conn.family, conn.type) - except (socket.error, OSError) as err: - if err.args[0] != errno.EBADF: - raise - else: - with contextlib.closing(dupsock): - assert dupsock.family == conn.family - assert dupsock.type == conn.type - - -def safe_remove(file): - "Convenience function for removing temporary test files" - try: - os.remove(file) - except OSError as err: - if err.errno != errno.ENOENT: - # # file is being used by another process - # if WINDOWS and isinstance(err, WindowsError) and err.errno == 13: - # return - raise - - -def safe_rmdir(dir): - "Convenience function for removing temporary test directories" - try: - os.rmdir(dir) - except OSError as err: - if err.errno != errno.ENOENT: - raise - - -@contextlib.contextmanager -def chdir(dirname): - """Context manager which temporarily changes the current directory.""" - curdir = os.getcwd() - try: - os.chdir(dirname) - yield - finally: - os.chdir(curdir) - - -def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): - """Keep calling function for timeout secs and exit if eval() - expression is True. - """ - stop_at = time.time() + timeout - while time.time() < stop_at: - ret = fun() - if eval(expr): - return ret - time.sleep(0.001) - raise RuntimeError('timed out (ret=%r)' % ret) - - -def retry_before_failing(ntimes=None): - """Decorator which runs a test function and retries N times before - actually failing. - """ - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - times = ntimes or NO_RETRIES - assert times, times - exc = None - for x in range(times): - try: - return fun(*args, **kwargs) - except AssertionError as _: - exc = _ - raise exc - return wrapper - return decorator - - -def skip_on_access_denied(only_if=None): - """Decorator to Ignore AccessDenied exceptions.""" - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - try: - return fun(*args, **kwargs) - except psutil.AccessDenied: - if only_if is not None: - if not only_if: - raise - msg = "%r was skipped because it raised AccessDenied" \ - % fun.__name__ - raise unittest.SkipTest(msg) - return wrapper - return decorator - - -def skip_on_not_implemented(only_if=None): - """Decorator to Ignore NotImplementedError exceptions.""" - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - try: - return fun(*args, **kwargs) - except NotImplementedError: - if only_if is not None: - if not only_if: - raise - msg = "%r was skipped because it raised NotImplementedError" \ - % fun.__name__ - raise unittest.SkipTest(msg) - return wrapper - return decorator - - -def create_temp_executable_file(suffix, code="void main() { pause(); }"): - tmpdir = None - if TRAVIS and OSX: - tmpdir = "/private/tmp" - fd, path = tempfile.mkstemp( - prefix='psu', suffix=suffix, dir=tmpdir) - os.close(fd) - - if which("gcc"): - fd, c_file = tempfile.mkstemp( - prefix='psu', suffix='.c', dir=tmpdir) - os.close(fd) - with open(c_file, "w") as f: - f.write(code) - subprocess.check_call(["gcc", c_file, "-o", path]) - safe_remove(c_file) - else: - # fallback - use python's executable - shutil.copyfile(sys.executable, path) - if POSIX: - st = os.stat(path) - os.chmod(path, st.st_mode | stat.S_IEXEC) - return path - - -if WINDOWS: - def get_winver(): - wv = sys.getwindowsversion() - if hasattr(wv, 'service_pack_major'): # python >= 2.7 - sp = wv.service_pack_major or 0 - else: - r = re.search("\s\d$", wv[4]) - if r: - sp = int(r.group(0)) - else: - sp = 0 - return (wv[0], wv[1], sp) - - -# In Python 3 paths are unicode objects by default. Surrogate escapes -# are used to handle non-character data. -def encode_path(path): - if PY3: - return path.encode(sys.getfilesystemencoding(), - errors="surrogateescape") - else: - return path - - -def decode_path(path): - if PY3: - return path.decode(sys.getfilesystemencoding(), - errors="surrogateescape") - else: - return path - - -class ThreadTask(threading.Thread): - """A thread object used for running process thread tests.""" - - def __init__(self): - threading.Thread.__init__(self) - self._running = False - self._interval = None - self._flag = threading.Event() - - def __repr__(self): - name = self.__class__.__name__ - return '<%s running=%s at %#x>' % (name, self._running, id(self)) - - def start(self, interval=0.001): - """Start thread and keep it running until an explicit - stop() request. Polls for shutdown every 'timeout' seconds. - """ - if self._running: - raise ValueError("already started") - self._interval = interval - threading.Thread.start(self) - self._flag.wait() - - def run(self): - self._running = True - self._flag.set() - while self._running: - time.sleep(self._interval) - - def stop(self): - """Stop thread execution and and waits until it is stopped.""" - if not self._running: - raise ValueError("already stopped") - self._running = False - self.join() - - -# =================================================================== -# --- System-related API tests -# =================================================================== - -class TestSystemAPIs(unittest.TestCase): - """Tests for system-related APIs.""" - - def setUp(self): - safe_remove(TESTFN) - - def tearDown(self): - reap_children() - - def test_process_iter(self): - self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) - sproc = get_test_subprocess() - self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) - - with mock.patch('psutil.Process', - side_effect=psutil.NoSuchProcess(os.getpid())): - self.assertEqual(list(psutil.process_iter()), []) - with mock.patch('psutil.Process', - side_effect=psutil.AccessDenied(os.getpid())): - with self.assertRaises(psutil.AccessDenied): - list(psutil.process_iter()) - - def test_wait_procs(self): - def callback(p): - l.append(p.pid) - - l = [] - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() - procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] - self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) - self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) - t = time.time() - gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) - - self.assertLess(time.time() - t, 0.5) - self.assertEqual(gone, []) - self.assertEqual(len(alive), 3) - self.assertEqual(l, []) - for p in alive: - self.assertFalse(hasattr(p, 'returncode')) - - @retry_before_failing(30) - def test(procs, callback): - gone, alive = psutil.wait_procs(procs, timeout=0.03, - callback=callback) - self.assertEqual(len(gone), 1) - self.assertEqual(len(alive), 2) - return gone, alive - - sproc3.terminate() - gone, alive = test(procs, callback) - self.assertIn(sproc3.pid, [x.pid for x in gone]) - if POSIX: - self.assertEqual(gone.pop().returncode, signal.SIGTERM) - else: - self.assertEqual(gone.pop().returncode, 1) - self.assertEqual(l, [sproc3.pid]) - for p in alive: - self.assertFalse(hasattr(p, 'returncode')) - - @retry_before_failing(30) - def test(procs, callback): - gone, alive = psutil.wait_procs(procs, timeout=0.03, - callback=callback) - self.assertEqual(len(gone), 3) - self.assertEqual(len(alive), 0) - return gone, alive - - sproc1.terminate() - sproc2.terminate() - gone, alive = test(procs, callback) - self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) - for p in gone: - self.assertTrue(hasattr(p, 'returncode')) - - def test_wait_procs_no_timeout(self): - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() - procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] - for p in procs: - p.terminate() - gone, alive = psutil.wait_procs(procs) - - def test_boot_time(self): - bt = psutil.boot_time() - self.assertIsInstance(bt, float) - self.assertGreater(bt, 0) - self.assertLess(bt, time.time()) - - @unittest.skipUnless(POSIX, 'posix only') - def test_PAGESIZE(self): - # pagesize is used internally to perform different calculations - # and it's determined by using SC_PAGE_SIZE; make sure - # getpagesize() returns the same value. - import resource - self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) - - def test_virtual_memory(self): - mem = psutil.virtual_memory() - assert mem.total > 0, mem - assert mem.available > 0, mem - assert 0 <= mem.percent <= 100, mem - assert mem.used > 0, mem - assert mem.free >= 0, mem - for name in mem._fields: - value = getattr(mem, name) - if name != 'percent': - self.assertIsInstance(value, (int, long)) - if name != 'total': - if not value >= 0: - self.fail("%r < 0 (%s)" % (name, value)) - if value > mem.total: - self.fail("%r > total (total=%s, %s=%s)" - % (name, mem.total, name, value)) - - def test_swap_memory(self): - mem = psutil.swap_memory() - assert mem.total >= 0, mem - assert mem.used >= 0, mem - if mem.total > 0: - # likely a system with no swap partition - assert mem.free > 0, mem - else: - assert mem.free == 0, mem - assert 0 <= mem.percent <= 100, mem - assert mem.sin >= 0, mem - assert mem.sout >= 0, mem - - def test_pid_exists(self): - sproc = get_test_subprocess(wait=True) - self.assertTrue(psutil.pid_exists(sproc.pid)) - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertFalse(psutil.pid_exists(sproc.pid)) - self.assertFalse(psutil.pid_exists(-1)) - self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) - # pid 0 - psutil.pid_exists(0) == 0 in psutil.pids() - - def test_pid_exists_2(self): - reap_children() - pids = psutil.pids() - for pid in pids: - try: - assert psutil.pid_exists(pid) - except AssertionError: - # in case the process disappeared in meantime fail only - # if it is no longer in psutil.pids() - time.sleep(.1) - if pid in psutil.pids(): - self.fail(pid) - pids = range(max(pids) + 5000, max(pids) + 6000) - for pid in pids: - self.assertFalse(psutil.pid_exists(pid), msg=pid) - - def test_pids(self): - plist = [x.pid for x in psutil.process_iter()] - pidlist = psutil.pids() - self.assertEqual(plist.sort(), pidlist.sort()) - # make sure every pid is unique - self.assertEqual(len(pidlist), len(set(pidlist))) - - def test_test(self): - # test for psutil.test() function - stdout = sys.stdout - sys.stdout = DEVNULL - try: - psutil.test() - finally: - sys.stdout = stdout - - def test_cpu_count(self): - logical = psutil.cpu_count() - self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) - self.assertGreaterEqual(logical, 1) - # - if os.path.exists("/proc/cpuinfo"): - with open("/proc/cpuinfo") as fd: - cpuinfo_data = fd.read() - if "physical id" not in cpuinfo_data: - raise unittest.SkipTest("cpuinfo doesn't include physical id") - physical = psutil.cpu_count(logical=False) - self.assertGreaterEqual(physical, 1) - self.assertGreaterEqual(logical, physical) - - def test_sys_cpu_times(self): - total = 0 - times = psutil.cpu_times() - sum(times) - for cp_time in times: - self.assertIsInstance(cp_time, float) - self.assertGreaterEqual(cp_time, 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - # CPU times are always supposed to increase over time - # or at least remain the same and that's because time - # cannot go backwards. - # Surprisingly sometimes this might not be the case (at - # least on Windows and Linux), see: - # https://github.com/giampaolo/psutil/issues/392 - # https://github.com/giampaolo/psutil/issues/645 - # if not WINDOWS: - # last = psutil.cpu_times() - # for x in range(100): - # new = psutil.cpu_times() - # for field in new._fields: - # new_t = getattr(new, field) - # last_t = getattr(last, field) - # self.assertGreaterEqual(new_t, last_t, - # msg="%s %s" % (new_t, last_t)) - # last = new - - def test_sys_cpu_times2(self): - t1 = sum(psutil.cpu_times()) - time.sleep(0.1) - t2 = sum(psutil.cpu_times()) - difference = t2 - t1 - if not difference >= 0.05: - self.fail("difference %s" % difference) - - def test_sys_per_cpu_times(self): - for times in psutil.cpu_times(percpu=True): - total = 0 - sum(times) - for cp_time in times: - self.assertIsInstance(cp_time, float) - self.assertGreaterEqual(cp_time, 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), - len(psutil.cpu_times(percpu=False))) - - # Note: in theory CPU times are always supposed to increase over - # time or remain the same but never go backwards. In practice - # sometimes this is not the case. - # This issue seemd to be afflict Windows: - # https://github.com/giampaolo/psutil/issues/392 - # ...but it turns out also Linux (rarely) behaves the same. - # last = psutil.cpu_times(percpu=True) - # for x in range(100): - # new = psutil.cpu_times(percpu=True) - # for index in range(len(new)): - # newcpu = new[index] - # lastcpu = last[index] - # for field in newcpu._fields: - # new_t = getattr(newcpu, field) - # last_t = getattr(lastcpu, field) - # self.assertGreaterEqual( - # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) - # last = new - - def test_sys_per_cpu_times_2(self): - tot1 = psutil.cpu_times(percpu=True) - stop_at = time.time() + 0.1 - while True: - if time.time() >= stop_at: - break - tot2 = psutil.cpu_times(percpu=True) - for t1, t2 in zip(tot1, tot2): - t1, t2 = sum(t1), sum(t2) - difference = t2 - t1 - if difference >= 0.05: - return - self.fail() - - def _test_cpu_percent(self, percent, last_ret, new_ret): - try: - self.assertIsInstance(percent, float) - self.assertGreaterEqual(percent, 0.0) - self.assertIsNot(percent, -0.0) - self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) - except AssertionError as err: - raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( - err, pprint.pformat(last_ret), pprint.pformat(new_ret))) - - def test_sys_cpu_percent(self): - last = psutil.cpu_percent(interval=0.001) - for x in range(100): - new = psutil.cpu_percent(interval=None) - self._test_cpu_percent(new, last, new) - last = new - - def test_sys_per_cpu_percent(self): - last = psutil.cpu_percent(interval=0.001, percpu=True) - self.assertEqual(len(last), psutil.cpu_count()) - for x in range(100): - new = psutil.cpu_percent(interval=None, percpu=True) - for percent in new: - self._test_cpu_percent(percent, last, new) - last = new - - def test_sys_cpu_times_percent(self): - last = psutil.cpu_times_percent(interval=0.001) - for x in range(100): - new = psutil.cpu_times_percent(interval=None) - for percent in new: - self._test_cpu_percent(percent, last, new) - self._test_cpu_percent(sum(new), last, new) - last = new - - def test_sys_per_cpu_times_percent(self): - last = psutil.cpu_times_percent(interval=0.001, percpu=True) - self.assertEqual(len(last), psutil.cpu_count()) - for x in range(100): - new = psutil.cpu_times_percent(interval=None, percpu=True) - for cpu in new: - for percent in cpu: - self._test_cpu_percent(percent, last, new) - self._test_cpu_percent(sum(cpu), last, new) - last = new - - def test_sys_per_cpu_times_percent_negative(self): - # see: https://github.com/giampaolo/psutil/issues/645 - psutil.cpu_times_percent(percpu=True) - zero_times = [x._make([0 for x in range(len(x._fields))]) - for x in psutil.cpu_times(percpu=True)] - with mock.patch('psutil.cpu_times', return_value=zero_times): - for cpu in psutil.cpu_times_percent(percpu=True): - for percent in cpu: - self._test_cpu_percent(percent, None, None) - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - def test_disk_usage(self): - usage = psutil.disk_usage(os.getcwd()) - assert usage.total > 0, usage - assert usage.used > 0, usage - assert usage.free > 0, usage - assert usage.total > usage.used, usage - assert usage.total > usage.free, usage - assert 0 <= usage.percent <= 100, usage.percent - if hasattr(shutil, 'disk_usage'): - # py >= 3.3, see: http://bugs.python.org/issue12442 - shutil_usage = shutil.disk_usage(os.getcwd()) - tolerance = 5 * 1024 * 1024 # 5MB - self.assertEqual(usage.total, shutil_usage.total) - self.assertAlmostEqual(usage.free, shutil_usage.free, - delta=tolerance) - self.assertAlmostEqual(usage.used, shutil_usage.used, - delta=tolerance) - - # if path does not exist OSError ENOENT is expected across - # all platforms - fname = tempfile.mktemp() - try: - psutil.disk_usage(fname) - except OSError as err: - if err.args[0] != errno.ENOENT: - raise - else: - self.fail("OSError not raised") - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - def test_disk_usage_unicode(self): - # see: https://github.com/giampaolo/psutil/issues/416 - # XXX this test is not really reliable as it always fails on - # Python 3.X (2.X is fine) - try: - safe_rmdir(TESTFN_UNICODE) - os.mkdir(TESTFN_UNICODE) - psutil.disk_usage(TESTFN_UNICODE) - safe_rmdir(TESTFN_UNICODE) - except UnicodeEncodeError: - pass - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") - def test_disk_partitions(self): - # all = False - ls = psutil.disk_partitions(all=False) - # on travis we get: - # self.assertEqual(p.cpu_affinity(), [n]) - # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] - self.assertTrue(ls, msg=ls) - for disk in ls: - if WINDOWS and 'cdrom' in disk.opts: - continue - if not POSIX: - assert os.path.exists(disk.device), disk - else: - # we cannot make any assumption about this, see: - # http://goo.gl/p9c43 - disk.device - if SUNOS: - # on solaris apparently mount points can also be files - assert os.path.exists(disk.mountpoint), disk - else: - assert os.path.isdir(disk.mountpoint), disk - assert disk.fstype, disk - self.assertIsInstance(disk.opts, str) - - # all = True - ls = psutil.disk_partitions(all=True) - self.assertTrue(ls, msg=ls) - for disk in psutil.disk_partitions(all=True): - if not WINDOWS: - try: - os.stat(disk.mountpoint) - except OSError as err: - if TRAVIS and OSX and err.errno == errno.EIO: - continue - # http://mail.python.org/pipermail/python-dev/ - # 2012-June/120787.html - if err.errno not in (errno.EPERM, errno.EACCES): - raise - else: - if SUNOS: - # on solaris apparently mount points can also be files - assert os.path.exists(disk.mountpoint), disk - else: - assert os.path.isdir(disk.mountpoint), disk - self.assertIsInstance(disk.fstype, str) - self.assertIsInstance(disk.opts, str) - - def find_mount_point(path): - path = os.path.abspath(path) - while not os.path.ismount(path): - path = os.path.dirname(path) - return path - - mount = find_mount_point(__file__) - mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] - self.assertIn(mount, mounts) - psutil.disk_usage(mount) - - @skip_on_access_denied() - def test_net_connections(self): - def check(cons, families, types_): - for conn in cons: - self.assertIn(conn.family, families, msg=conn) - if conn.family != getattr(socket, 'AF_UNIX', object()): - self.assertIn(conn.type, types_, msg=conn) - - from psutil._common import conn_tmap - for kind, groups in conn_tmap.items(): - if SUNOS and kind == 'unix': - continue - families, types_ = groups - cons = psutil.net_connections(kind) - self.assertEqual(len(cons), len(set(cons))) - check(cons, families, types_) - - def test_net_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.bytes_sent) - self.assertEqual(nt[1], nt.bytes_recv) - self.assertEqual(nt[2], nt.packets_sent) - self.assertEqual(nt[3], nt.packets_recv) - self.assertEqual(nt[4], nt.errin) - self.assertEqual(nt[5], nt.errout) - self.assertEqual(nt[6], nt.dropin) - self.assertEqual(nt[7], nt.dropout) - assert nt.bytes_sent >= 0, nt - assert nt.bytes_recv >= 0, nt - assert nt.packets_sent >= 0, nt - assert nt.packets_recv >= 0, nt - assert nt.errin >= 0, nt - assert nt.errout >= 0, nt - assert nt.dropin >= 0, nt - assert nt.dropout >= 0, nt - - ret = psutil.net_io_counters(pernic=False) - check_ntuple(ret) - ret = psutil.net_io_counters(pernic=True) - self.assertNotEqual(ret, []) - for key in ret: - self.assertTrue(key) - check_ntuple(ret[key]) - - def test_net_if_addrs(self): - nics = psutil.net_if_addrs() - assert nics, nics - - # Not reliable on all platforms (net_if_addrs() reports more - # interfaces). - # self.assertEqual(sorted(nics.keys()), - # sorted(psutil.net_io_counters(pernic=True).keys())) - - families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) - for nic, addrs in nics.items(): - self.assertEqual(len(set(addrs)), len(addrs)) - for addr in addrs: - self.assertIsInstance(addr.family, int) - self.assertIsInstance(addr.address, str) - self.assertIsInstance(addr.netmask, (str, type(None))) - self.assertIsInstance(addr.broadcast, (str, type(None))) - self.assertIn(addr.family, families) - if sys.version_info >= (3, 4): - self.assertIsInstance(addr.family, enum.IntEnum) - if addr.family == socket.AF_INET: - s = socket.socket(addr.family) - with contextlib.closing(s): - s.bind((addr.address, 0)) - elif addr.family == socket.AF_INET6: - info = socket.getaddrinfo( - addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, - 0, socket.AI_PASSIVE)[0] - af, socktype, proto, canonname, sa = info - s = socket.socket(af, socktype, proto) - with contextlib.closing(s): - s.bind(sa) - for ip in (addr.address, addr.netmask, addr.broadcast, - addr.ptp): - if ip is not None: - # TODO: skip AF_INET6 for now because I get: - # AddressValueError: Only hex digits permitted in - # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' - if addr.family != AF_INET6: - check_ip_address(ip, addr.family) - # broadcast and ptp addresses are mutually exclusive - if addr.broadcast: - self.assertIsNone(addr.ptp) - elif addr.ptp: - self.assertIsNone(addr.broadcast) - - if BSD or OSX or SUNOS: - if hasattr(socket, "AF_LINK"): - self.assertEqual(psutil.AF_LINK, socket.AF_LINK) - elif LINUX: - self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) - elif WINDOWS: - self.assertEqual(psutil.AF_LINK, -1) - - @unittest.skipIf(TRAVIS, "EPERM on travis") - def test_net_if_stats(self): - nics = psutil.net_if_stats() - assert nics, nics - all_duplexes = (psutil.NIC_DUPLEX_FULL, - psutil.NIC_DUPLEX_HALF, - psutil.NIC_DUPLEX_UNKNOWN) - for nic, stats in nics.items(): - isup, duplex, speed, mtu = stats - self.assertIsInstance(isup, bool) - self.assertIn(duplex, all_duplexes) - self.assertIn(duplex, all_duplexes) - self.assertGreaterEqual(speed, 0) - self.assertGreaterEqual(mtu, 0) - - @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), - '/proc/diskstats not available on this linux version') - @unittest.skipIf(APPVEYOR, - "can't find any physical disk on Appveyor") - def test_disk_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.read_count) - self.assertEqual(nt[1], nt.write_count) - self.assertEqual(nt[2], nt.read_bytes) - self.assertEqual(nt[3], nt.write_bytes) - self.assertEqual(nt[4], nt.read_time) - self.assertEqual(nt[5], nt.write_time) - assert nt.read_count >= 0, nt - assert nt.write_count >= 0, nt - assert nt.read_bytes >= 0, nt - assert nt.write_bytes >= 0, nt - assert nt.read_time >= 0, nt - assert nt.write_time >= 0, nt - - ret = psutil.disk_io_counters(perdisk=False) - check_ntuple(ret) - ret = psutil.disk_io_counters(perdisk=True) - # make sure there are no duplicates - self.assertEqual(len(ret), len(set(ret))) - for key in ret: - assert key, key - check_ntuple(ret[key]) - if LINUX and key[-1].isdigit(): - # if 'sda1' is listed 'sda' shouldn't, see: - # https://github.com/giampaolo/psutil/issues/338 - while key[-1].isdigit(): - key = key[:-1] - self.assertNotIn(key, ret.keys()) - - def test_users(self): - users = psutil.users() - if not APPVEYOR: - self.assertNotEqual(users, []) - for user in users: - assert user.name, user - user.terminal - user.host - assert user.started > 0.0, user - datetime.datetime.fromtimestamp(user.started) +from psutil.tests import AF_INET6 +from psutil.tests import AF_UNIX +from psutil.tests import APPVEYOR +from psutil.tests import call_until +from psutil.tests import chdir +from psutil.tests import check_connection_ntuple +from psutil.tests import create_temp_executable_file +from psutil.tests import decode_path +from psutil.tests import encode_path +from psutil.tests import enum +from psutil.tests import get_test_subprocess +from psutil.tests import get_winver +from psutil.tests import GLOBAL_TIMEOUT +from psutil.tests import pyrun +from psutil.tests import PYTHON +from psutil.tests import reap_children +from psutil.tests import RLIMIT_SUPPORT +from psutil.tests import safe_remove +from psutil.tests import safe_rmdir +from psutil.tests import sh +from psutil.tests import skip_on_access_denied +from psutil.tests import skip_on_not_implemented +from psutil.tests import test_module_by_name +from psutil.tests import TESTFILE_PREFIX +from psutil.tests import TESTFN +from psutil.tests import ThreadTask +from psutil.tests import TRAVIS +from psutil.tests import unittest +from psutil.tests import VALID_PROC_STATUSES +from psutil.tests import wait_for_file +from psutil.tests import wait_for_pid +from psutil.tests import warn +from psutil.tests import which +from psutil.tests import WIN_VISTA # =================================================================== @@ -2538,6 +1424,10 @@ def test_environ(self): ): d.pop(key, None) d2.pop(key, None) + if LINUX: + for key in ('PLAT', ): + d.pop(key, None) + d2.pop(key, None) self.assertEqual(d, d2) @@ -2911,389 +1801,9 @@ def test_zombie_process(self): # =================================================================== -# --- Misc tests -# =================================================================== - -class TestMisc(unittest.TestCase): - """Misc / generic tests.""" - - def test_process__repr__(self, func=repr): - p = psutil.Process() - r = func(p) - self.assertIn("psutil.Process", r) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("name=", r) - self.assertIn(p.name(), r) - with mock.patch.object(psutil.Process, "name", - side_effect=psutil.ZombieProcess(os.getpid())): - p = psutil.Process() - r = func(p) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("zombie", r) - self.assertNotIn("name=", r) - with mock.patch.object(psutil.Process, "name", - side_effect=psutil.NoSuchProcess(os.getpid())): - p = psutil.Process() - r = func(p) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("terminated", r) - self.assertNotIn("name=", r) - with mock.patch.object(psutil.Process, "name", - side_effect=psutil.AccessDenied(os.getpid())): - p = psutil.Process() - r = func(p) - self.assertIn("pid=%s" % p.pid, r) - self.assertNotIn("name=", r) - - def test_process__str__(self): - self.test_process__repr__(func=str) - - def test_no_such_process__repr__(self, func=repr): - self.assertEqual( - repr(psutil.NoSuchProcess(321)), - "psutil.NoSuchProcess process no longer exists (pid=321)") - self.assertEqual( - repr(psutil.NoSuchProcess(321, name='foo')), - "psutil.NoSuchProcess process no longer exists (pid=321, " - "name='foo')") - self.assertEqual( - repr(psutil.NoSuchProcess(321, msg='foo')), - "psutil.NoSuchProcess foo") - - def test_zombie_process__repr__(self, func=repr): - self.assertEqual( - repr(psutil.ZombieProcess(321)), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321)") - self.assertEqual( - repr(psutil.ZombieProcess(321, name='foo')), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321, name='foo')") - self.assertEqual( - repr(psutil.ZombieProcess(321, name='foo', ppid=1)), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321, name='foo', ppid=1)") - self.assertEqual( - repr(psutil.ZombieProcess(321, msg='foo')), - "psutil.ZombieProcess foo") - - def test_access_denied__repr__(self, func=repr): - self.assertEqual( - repr(psutil.AccessDenied(321)), - "psutil.AccessDenied (pid=321)") - self.assertEqual( - repr(psutil.AccessDenied(321, name='foo')), - "psutil.AccessDenied (pid=321, name='foo')") - self.assertEqual( - repr(psutil.AccessDenied(321, msg='foo')), - "psutil.AccessDenied foo") - - def test_timeout_expired__repr__(self, func=repr): - self.assertEqual( - repr(psutil.TimeoutExpired(321)), - "psutil.TimeoutExpired timeout after 321 seconds") - self.assertEqual( - repr(psutil.TimeoutExpired(321, pid=111)), - "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") - self.assertEqual( - repr(psutil.TimeoutExpired(321, pid=111, name='foo')), - "psutil.TimeoutExpired timeout after 321 seconds " - "(pid=111, name='foo')") - - def test_process__eq__(self): - p1 = psutil.Process() - p2 = psutil.Process() - self.assertEqual(p1, p2) - p2._ident = (0, 0) - self.assertNotEqual(p1, p2) - self.assertNotEqual(p1, 'foo') - - def test_process__hash__(self): - s = set([psutil.Process(), psutil.Process()]) - self.assertEqual(len(s), 1) - - def test__all__(self): - dir_psutil = dir(psutil) - for name in dir_psutil: - if name in ('callable', 'error', 'namedtuple', - 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', - 'TOTAL_PHYMEM'): - continue - if not name.startswith('_'): - try: - __import__(name) - except ImportError: - if name not in psutil.__all__: - fun = getattr(psutil, name) - if fun is None: - continue - if (fun.__doc__ is not None and - 'deprecated' not in fun.__doc__.lower()): - self.fail('%r not in psutil.__all__' % name) - - # Import 'star' will break if __all__ is inconsistent, see: - # https://github.com/giampaolo/psutil/issues/656 - # Can't do `from psutil import *` as it won't work on python 3 - # so we simply iterate over __all__. - for name in psutil.__all__: - self.assertIn(name, dir_psutil) - - def test_version(self): - self.assertEqual('.'.join([str(x) for x in psutil.version_info]), - psutil.__version__) - - def test_memoize(self): - from psutil._common import memoize - - @memoize - def foo(*args, **kwargs): - "foo docstring" - calls.append(None) - return (args, kwargs) - - calls = [] - # no args - for x in range(2): - ret = foo() - expected = ((), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 1) - # with args - for x in range(2): - ret = foo(1) - expected = ((1, ), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 2) - # with args + kwargs - for x in range(2): - ret = foo(1, bar=2) - expected = ((1, ), {'bar': 2}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 3) - # clear cache - foo.cache_clear() - ret = foo() - expected = ((), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 4) - # docstring - self.assertEqual(foo.__doc__, "foo docstring") - - def test_parse_environ_block(self): - from psutil._common import parse_environ_block - - def k(s): - return s.upper() if WINDOWS else s - - self.assertEqual(parse_environ_block("a=1\0"), - {k("a"): "1"}) - self.assertEqual(parse_environ_block("a=1\0b=2\0\0"), - {k("a"): "1", k("b"): "2"}) - self.assertEqual(parse_environ_block("a=1\0b=\0\0"), - {k("a"): "1", k("b"): ""}) - # ignore everything after \0\0 - self.assertEqual(parse_environ_block("a=1\0b=2\0\0c=3\0"), - {k("a"): "1", k("b"): "2"}) - # ignore everything that is not an assignment - self.assertEqual(parse_environ_block("xxx\0a=1\0"), {k("a"): "1"}) - self.assertEqual(parse_environ_block("a=1\0=b=2\0"), {k("a"): "1"}) - # do not fail if the block is incomplete - self.assertEqual(parse_environ_block("a=1\0b=2"), {k("a"): "1"}) - - def test_supports_ipv6(self): - if supports_ipv6(): - with mock.patch('psutil._common.socket') as s: - s.has_ipv6 = False - assert not supports_ipv6() - with mock.patch('psutil._common.socket.socket', - side_effect=socket.error) as s: - assert not supports_ipv6() - assert s.called - with mock.patch('psutil._common.socket.socket', - side_effect=socket.gaierror) as s: - assert not supports_ipv6() - assert s.called - with mock.patch('psutil._common.socket.socket.bind', - side_effect=socket.gaierror) as s: - assert not supports_ipv6() - assert s.called - else: - if hasattr(socket, 'AF_INET6'): - with self.assertRaises(Exception): - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - sock.bind(("::1", 0)) - - def test_isfile_strict(self): - from psutil._common import isfile_strict - this_file = os.path.abspath(__file__) - assert isfile_strict(this_file) - assert not isfile_strict(os.path.dirname(this_file)) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EPERM, "foo")): - self.assertRaises(OSError, isfile_strict, this_file) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EACCES, "foo")): - self.assertRaises(OSError, isfile_strict, this_file) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EINVAL, "foo")): - assert not isfile_strict(this_file) - with mock.patch('psutil._common.stat.S_ISREG', return_value=False): - assert not isfile_strict(this_file) - - def test_serialization(self): - def check(ret): - if json is not None: - json.loads(json.dumps(ret)) - a = pickle.dumps(ret) - b = pickle.loads(a) - self.assertEqual(ret, b) - - check(psutil.Process().as_dict()) - check(psutil.virtual_memory()) - check(psutil.swap_memory()) - check(psutil.cpu_times()) - check(psutil.cpu_times_percent(interval=0)) - check(psutil.net_io_counters()) - if LINUX and not os.path.exists('/proc/diskstats'): - pass - else: - if not APPVEYOR: - check(psutil.disk_io_counters()) - check(psutil.disk_partitions()) - check(psutil.disk_usage(os.getcwd())) - check(psutil.users()) - - def test_setup_script(self): - here = os.path.abspath(os.path.dirname(__file__)) - setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py')) - module = imp.load_source('setup', setup_py) - self.assertRaises(SystemExit, module.setup) - self.assertEqual(module.get_version(), psutil.__version__) - - def test_ad_on_process_creation(self): - # We are supposed to be able to instantiate Process also in case - # of zombie processes or access denied. - with mock.patch.object(psutil.Process, 'create_time', - side_effect=psutil.AccessDenied) as meth: - psutil.Process() - assert meth.called - with mock.patch.object(psutil.Process, 'create_time', - side_effect=psutil.ZombieProcess(1)) as meth: - psutil.Process() - assert meth.called - with mock.patch.object(psutil.Process, 'create_time', - side_effect=ValueError) as meth: - with self.assertRaises(ValueError): - psutil.Process() - assert meth.called - - def test_psutil_is_reloadable(self): - importlib.reload(psutil) - - -# =================================================================== -# --- Example script tests +# --- Unicode tests # =================================================================== -class TestExampleScripts(unittest.TestCase): - """Tests for scripts in the examples directory.""" - - def assert_stdout(self, exe, args=None): - exe = os.path.join(EXAMPLES_DIR, exe) - if args: - exe = exe + ' ' + args - try: - out = sh(sys.executable + ' ' + exe).strip() - except RuntimeError as err: - if 'AccessDenied' in str(err): - return str(err) - else: - raise - assert out, out - return out - - def assert_syntax(self, exe, args=None): - exe = os.path.join(EXAMPLES_DIR, exe) - with open(exe, 'r') as f: - src = f.read() - ast.parse(src) - - def test_check_presence(self): - # make sure all example scripts have a test method defined - meths = dir(self) - for name in os.listdir(EXAMPLES_DIR): - if name.endswith('.py'): - if 'test_' + os.path.splitext(name)[0] not in meths: - # self.assert_stdout(name) - self.fail('no test defined for %r script' - % os.path.join(EXAMPLES_DIR, name)) - - @unittest.skipUnless(POSIX, "UNIX only") - def test_executable(self): - for name in os.listdir(EXAMPLES_DIR): - if name.endswith('.py'): - path = os.path.join(EXAMPLES_DIR, name) - if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: - self.fail('%r is not executable' % path) - - def test_disk_usage(self): - self.assert_stdout('disk_usage.py') - - def test_free(self): - self.assert_stdout('free.py') - - def test_meminfo(self): - self.assert_stdout('meminfo.py') - - def test_process_detail(self): - self.assert_stdout('process_detail.py') - - @unittest.skipIf(APPVEYOR, "can't find users on Appveyor") - def test_who(self): - self.assert_stdout('who.py') - - def test_ps(self): - self.assert_stdout('ps.py') - - def test_pstree(self): - self.assert_stdout('pstree.py') - - def test_netstat(self): - self.assert_stdout('netstat.py') - - @unittest.skipIf(TRAVIS, "permission denied on travis") - def test_ifconfig(self): - self.assert_stdout('ifconfig.py') - - @unittest.skipIf(OPENBSD or NETBSD, "memory maps not supported") - def test_pmap(self): - self.assert_stdout('pmap.py', args=str(os.getpid())) - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_killall(self): - self.assert_syntax('killall.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_nettop(self): - self.assert_syntax('nettop.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_top(self): - self.assert_syntax('top.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_iotop(self): - self.assert_syntax('iotop.py') - - def test_pidof(self): - output = self.assert_stdout('pidof.py %s' % psutil.Process().name()) - self.assertIn(str(os.getpid()), output) - - class TestUnicode(unittest.TestCase): # See: https://github.com/giampaolo/psutil/issues/655 @@ -3457,53 +1967,5 @@ def test_proc_open_files(self): self.assertIn(funny_file, encode_path(path)) -def main(): - tests = [] - test_suite = unittest.TestSuite() - tests.append(TestSystemAPIs) - tests.append(TestProcess) - tests.append(TestFetchAllProcesses) - tests.append(TestMisc) - tests.append(TestExampleScripts) - tests.append(LimitedUserTestCase) - tests.append(TestUnicode) - tests.append(TestNonUnicode) - - if POSIX: - from _posix import PosixSpecificTestCase - tests.append(PosixSpecificTestCase) - - # import the specific platform test suite - stc = None - if LINUX: - from _linux import LinuxSpecificTestCase as stc - elif WINDOWS: - from _windows import WindowsSpecificTestCase as stc - from _windows import TestDualProcessImplementation - from _windows import RemoteProcessTestCase - tests.append(TestDualProcessImplementation) - tests.append(RemoteProcessTestCase) - elif OSX: - from _osx import OSXSpecificTestCase as stc - elif SUNOS: - from _sunos import SunOSSpecificTestCase as stc - elif BSD: - from _bsd import BSDSpecificTestCase as stc - if FREEBSD: - from _bsd import FreeBSDSpecificTestCase - tests.append(FreeBSDSpecificTestCase) - elif OPENBSD: - from _bsd import OpenBSDSpecificTestCase - tests.append(OpenBSDSpecificTestCase) - - if stc is not None: - tests.append(stc) - - for test_class in tests: - test_suite.addTest(unittest.makeSuite(test_class)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/test/_sunos.py b/psutil/tests/test_sunos.py similarity index 71% rename from test/_sunos.py rename to psutil/tests/test_sunos.py index 83fcade37..6a6bdc274 100644 --- a/test/_sunos.py +++ b/psutil/tests/test_sunos.py @@ -4,15 +4,15 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""Sun OS specific tests. These are implicitly run by test_psutil.py.""" +"""Sun OS specific tests.""" -import sys import os import psutil from psutil import SUNOS -from test_psutil import sh -from test_psutil import unittest +from psutil.tests import sh +from psutil.tests import test_module_by_name +from psutil.tests import unittest @unittest.skipUnless(SUNOS, "not a SunOS system") @@ -37,12 +37,5 @@ def test_swap_memory(self): self.assertEqual(psutil_swap.free, free) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py new file mode 100644 index 000000000..e1129a652 --- /dev/null +++ b/psutil/tests/test_system.py @@ -0,0 +1,649 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Tests for system APIS.""" + +import contextlib +import datetime +import errno +import os +import pprint +import shutil +import signal +import socket +import sys +import tempfile +import time + +import psutil +from psutil import BSD +from psutil import LINUX +from psutil import OSX +from psutil import POSIX +from psutil import SUNOS +from psutil import WINDOWS +from psutil.tests import AF_INET6 +from psutil.tests import APPVEYOR +from psutil.tests import check_ip_address +from psutil.tests import DEVNULL +from psutil.tests import enum +from psutil.tests import get_test_subprocess +from psutil.tests import mock +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import safe_remove +from psutil.tests import safe_rmdir +from psutil.tests import skip_on_access_denied +from psutil.tests import test_module_by_name +from psutil.tests import TESTFN +from psutil.tests import TESTFN_UNICODE +from psutil.tests import TRAVIS +from psutil.tests import unittest + + +# =================================================================== +# --- System-related API tests +# =================================================================== + + +class TestSystemAPIs(unittest.TestCase): + """Tests for system-related APIs.""" + + def setUp(self): + safe_remove(TESTFN) + + def tearDown(self): + reap_children() + + def test_process_iter(self): + self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) + sproc = get_test_subprocess() + self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) + p = psutil.Process(sproc.pid) + p.kill() + p.wait() + self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) + + with mock.patch('psutil.Process', + side_effect=psutil.NoSuchProcess(os.getpid())): + self.assertEqual(list(psutil.process_iter()), []) + with mock.patch('psutil.Process', + side_effect=psutil.AccessDenied(os.getpid())): + with self.assertRaises(psutil.AccessDenied): + list(psutil.process_iter()) + + def test_wait_procs(self): + def callback(p): + l.append(p.pid) + + l = [] + sproc1 = get_test_subprocess() + sproc2 = get_test_subprocess() + sproc3 = get_test_subprocess() + procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] + self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) + self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) + t = time.time() + gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) + + self.assertLess(time.time() - t, 0.5) + self.assertEqual(gone, []) + self.assertEqual(len(alive), 3) + self.assertEqual(l, []) + for p in alive: + self.assertFalse(hasattr(p, 'returncode')) + + @retry_before_failing(30) + def test(procs, callback): + gone, alive = psutil.wait_procs(procs, timeout=0.03, + callback=callback) + self.assertEqual(len(gone), 1) + self.assertEqual(len(alive), 2) + return gone, alive + + sproc3.terminate() + gone, alive = test(procs, callback) + self.assertIn(sproc3.pid, [x.pid for x in gone]) + if POSIX: + self.assertEqual(gone.pop().returncode, signal.SIGTERM) + else: + self.assertEqual(gone.pop().returncode, 1) + self.assertEqual(l, [sproc3.pid]) + for p in alive: + self.assertFalse(hasattr(p, 'returncode')) + + @retry_before_failing(30) + def test(procs, callback): + gone, alive = psutil.wait_procs(procs, timeout=0.03, + callback=callback) + self.assertEqual(len(gone), 3) + self.assertEqual(len(alive), 0) + return gone, alive + + sproc1.terminate() + sproc2.terminate() + gone, alive = test(procs, callback) + self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) + for p in gone: + self.assertTrue(hasattr(p, 'returncode')) + + def test_wait_procs_no_timeout(self): + sproc1 = get_test_subprocess() + sproc2 = get_test_subprocess() + sproc3 = get_test_subprocess() + procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] + for p in procs: + p.terminate() + gone, alive = psutil.wait_procs(procs) + + def test_boot_time(self): + bt = psutil.boot_time() + self.assertIsInstance(bt, float) + self.assertGreater(bt, 0) + self.assertLess(bt, time.time()) + + @unittest.skipUnless(POSIX, 'posix only') + def test_PAGESIZE(self): + # pagesize is used internally to perform different calculations + # and it's determined by using SC_PAGE_SIZE; make sure + # getpagesize() returns the same value. + import resource + self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) + + def test_virtual_memory(self): + mem = psutil.virtual_memory() + assert mem.total > 0, mem + assert mem.available > 0, mem + assert 0 <= mem.percent <= 100, mem + assert mem.used > 0, mem + assert mem.free >= 0, mem + for name in mem._fields: + value = getattr(mem, name) + if name != 'percent': + self.assertIsInstance(value, (int, long)) + if name != 'total': + if not value >= 0: + self.fail("%r < 0 (%s)" % (name, value)) + if value > mem.total: + self.fail("%r > total (total=%s, %s=%s)" + % (name, mem.total, name, value)) + + def test_swap_memory(self): + mem = psutil.swap_memory() + assert mem.total >= 0, mem + assert mem.used >= 0, mem + if mem.total > 0: + # likely a system with no swap partition + assert mem.free > 0, mem + else: + assert mem.free == 0, mem + assert 0 <= mem.percent <= 100, mem + assert mem.sin >= 0, mem + assert mem.sout >= 0, mem + + def test_pid_exists(self): + sproc = get_test_subprocess(wait=True) + self.assertTrue(psutil.pid_exists(sproc.pid)) + p = psutil.Process(sproc.pid) + p.kill() + p.wait() + self.assertFalse(psutil.pid_exists(sproc.pid)) + self.assertFalse(psutil.pid_exists(-1)) + self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) + # pid 0 + psutil.pid_exists(0) == 0 in psutil.pids() + + def test_pid_exists_2(self): + reap_children() + pids = psutil.pids() + for pid in pids: + try: + assert psutil.pid_exists(pid) + except AssertionError: + # in case the process disappeared in meantime fail only + # if it is no longer in psutil.pids() + time.sleep(.1) + if pid in psutil.pids(): + self.fail(pid) + pids = range(max(pids) + 5000, max(pids) + 6000) + for pid in pids: + self.assertFalse(psutil.pid_exists(pid), msg=pid) + + def test_pids(self): + plist = [x.pid for x in psutil.process_iter()] + pidlist = psutil.pids() + self.assertEqual(plist.sort(), pidlist.sort()) + # make sure every pid is unique + self.assertEqual(len(pidlist), len(set(pidlist))) + + def test_test(self): + # test for psutil.test() function + stdout = sys.stdout + sys.stdout = DEVNULL + try: + psutil.test() + finally: + sys.stdout = stdout + + def test_cpu_count(self): + logical = psutil.cpu_count() + self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) + self.assertGreaterEqual(logical, 1) + # + if os.path.exists("/proc/cpuinfo"): + with open("/proc/cpuinfo") as fd: + cpuinfo_data = fd.read() + if "physical id" not in cpuinfo_data: + raise unittest.SkipTest("cpuinfo doesn't include physical id") + physical = psutil.cpu_count(logical=False) + self.assertGreaterEqual(physical, 1) + self.assertGreaterEqual(logical, physical) + + def test_sys_cpu_times(self): + total = 0 + times = psutil.cpu_times() + sum(times) + for cp_time in times: + self.assertIsInstance(cp_time, float) + self.assertGreaterEqual(cp_time, 0.0) + total += cp_time + self.assertEqual(total, sum(times)) + str(times) + # CPU times are always supposed to increase over time + # or at least remain the same and that's because time + # cannot go backwards. + # Surprisingly sometimes this might not be the case (at + # least on Windows and Linux), see: + # https://github.com/giampaolo/psutil/issues/392 + # https://github.com/giampaolo/psutil/issues/645 + # if not WINDOWS: + # last = psutil.cpu_times() + # for x in range(100): + # new = psutil.cpu_times() + # for field in new._fields: + # new_t = getattr(new, field) + # last_t = getattr(last, field) + # self.assertGreaterEqual(new_t, last_t, + # msg="%s %s" % (new_t, last_t)) + # last = new + + def test_sys_cpu_times2(self): + t1 = sum(psutil.cpu_times()) + time.sleep(0.1) + t2 = sum(psutil.cpu_times()) + difference = t2 - t1 + if not difference >= 0.05: + self.fail("difference %s" % difference) + + def test_sys_per_cpu_times(self): + for times in psutil.cpu_times(percpu=True): + total = 0 + sum(times) + for cp_time in times: + self.assertIsInstance(cp_time, float) + self.assertGreaterEqual(cp_time, 0.0) + total += cp_time + self.assertEqual(total, sum(times)) + str(times) + self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), + len(psutil.cpu_times(percpu=False))) + + # Note: in theory CPU times are always supposed to increase over + # time or remain the same but never go backwards. In practice + # sometimes this is not the case. + # This issue seemd to be afflict Windows: + # https://github.com/giampaolo/psutil/issues/392 + # ...but it turns out also Linux (rarely) behaves the same. + # last = psutil.cpu_times(percpu=True) + # for x in range(100): + # new = psutil.cpu_times(percpu=True) + # for index in range(len(new)): + # newcpu = new[index] + # lastcpu = last[index] + # for field in newcpu._fields: + # new_t = getattr(newcpu, field) + # last_t = getattr(lastcpu, field) + # self.assertGreaterEqual( + # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) + # last = new + + def test_sys_per_cpu_times_2(self): + tot1 = psutil.cpu_times(percpu=True) + stop_at = time.time() + 0.1 + while True: + if time.time() >= stop_at: + break + tot2 = psutil.cpu_times(percpu=True) + for t1, t2 in zip(tot1, tot2): + t1, t2 = sum(t1), sum(t2) + difference = t2 - t1 + if difference >= 0.05: + return + self.fail() + + def _test_cpu_percent(self, percent, last_ret, new_ret): + try: + self.assertIsInstance(percent, float) + self.assertGreaterEqual(percent, 0.0) + self.assertIsNot(percent, -0.0) + self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) + except AssertionError as err: + raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( + err, pprint.pformat(last_ret), pprint.pformat(new_ret))) + + def test_sys_cpu_percent(self): + last = psutil.cpu_percent(interval=0.001) + for x in range(100): + new = psutil.cpu_percent(interval=None) + self._test_cpu_percent(new, last, new) + last = new + + def test_sys_per_cpu_percent(self): + last = psutil.cpu_percent(interval=0.001, percpu=True) + self.assertEqual(len(last), psutil.cpu_count()) + for x in range(100): + new = psutil.cpu_percent(interval=None, percpu=True) + for percent in new: + self._test_cpu_percent(percent, last, new) + last = new + + def test_sys_cpu_times_percent(self): + last = psutil.cpu_times_percent(interval=0.001) + for x in range(100): + new = psutil.cpu_times_percent(interval=None) + for percent in new: + self._test_cpu_percent(percent, last, new) + self._test_cpu_percent(sum(new), last, new) + last = new + + def test_sys_per_cpu_times_percent(self): + last = psutil.cpu_times_percent(interval=0.001, percpu=True) + self.assertEqual(len(last), psutil.cpu_count()) + for x in range(100): + new = psutil.cpu_times_percent(interval=None, percpu=True) + for cpu in new: + for percent in cpu: + self._test_cpu_percent(percent, last, new) + self._test_cpu_percent(sum(cpu), last, new) + last = new + + def test_sys_per_cpu_times_percent_negative(self): + # see: https://github.com/giampaolo/psutil/issues/645 + psutil.cpu_times_percent(percpu=True) + zero_times = [x._make([0 for x in range(len(x._fields))]) + for x in psutil.cpu_times(percpu=True)] + with mock.patch('psutil.cpu_times', return_value=zero_times): + for cpu in psutil.cpu_times_percent(percpu=True): + for percent in cpu: + self._test_cpu_percent(percent, None, None) + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + def test_disk_usage(self): + usage = psutil.disk_usage(os.getcwd()) + assert usage.total > 0, usage + assert usage.used > 0, usage + assert usage.free > 0, usage + assert usage.total > usage.used, usage + assert usage.total > usage.free, usage + assert 0 <= usage.percent <= 100, usage.percent + if hasattr(shutil, 'disk_usage'): + # py >= 3.3, see: http://bugs.python.org/issue12442 + shutil_usage = shutil.disk_usage(os.getcwd()) + tolerance = 5 * 1024 * 1024 # 5MB + self.assertEqual(usage.total, shutil_usage.total) + self.assertAlmostEqual(usage.free, shutil_usage.free, + delta=tolerance) + self.assertAlmostEqual(usage.used, shutil_usage.used, + delta=tolerance) + + # if path does not exist OSError ENOENT is expected across + # all platforms + fname = tempfile.mktemp() + try: + psutil.disk_usage(fname) + except OSError as err: + if err.args[0] != errno.ENOENT: + raise + else: + self.fail("OSError not raised") + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + def test_disk_usage_unicode(self): + # see: https://github.com/giampaolo/psutil/issues/416 + # XXX this test is not really reliable as it always fails on + # Python 3.X (2.X is fine) + try: + safe_rmdir(TESTFN_UNICODE) + os.mkdir(TESTFN_UNICODE) + psutil.disk_usage(TESTFN_UNICODE) + safe_rmdir(TESTFN_UNICODE) + except UnicodeEncodeError: + pass + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") + def test_disk_partitions(self): + # all = False + ls = psutil.disk_partitions(all=False) + # on travis we get: + # self.assertEqual(p.cpu_affinity(), [n]) + # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] + self.assertTrue(ls, msg=ls) + for disk in ls: + if WINDOWS and 'cdrom' in disk.opts: + continue + if not POSIX: + assert os.path.exists(disk.device), disk + else: + # we cannot make any assumption about this, see: + # http://goo.gl/p9c43 + disk.device + if SUNOS: + # on solaris apparently mount points can also be files + assert os.path.exists(disk.mountpoint), disk + else: + assert os.path.isdir(disk.mountpoint), disk + assert disk.fstype, disk + self.assertIsInstance(disk.opts, str) + + # all = True + ls = psutil.disk_partitions(all=True) + self.assertTrue(ls, msg=ls) + for disk in psutil.disk_partitions(all=True): + if not WINDOWS: + try: + os.stat(disk.mountpoint) + except OSError as err: + if TRAVIS and OSX and err.errno == errno.EIO: + continue + # http://mail.python.org/pipermail/python-dev/ + # 2012-June/120787.html + if err.errno not in (errno.EPERM, errno.EACCES): + raise + else: + if SUNOS: + # on solaris apparently mount points can also be files + assert os.path.exists(disk.mountpoint), disk + else: + assert os.path.isdir(disk.mountpoint), disk + self.assertIsInstance(disk.fstype, str) + self.assertIsInstance(disk.opts, str) + + def find_mount_point(path): + path = os.path.abspath(path) + while not os.path.ismount(path): + path = os.path.dirname(path) + return path + + mount = find_mount_point(__file__) + mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] + self.assertIn(mount, mounts) + psutil.disk_usage(mount) + + @skip_on_access_denied() + def test_net_connections(self): + def check(cons, families, types_): + for conn in cons: + self.assertIn(conn.family, families, msg=conn) + if conn.family != getattr(socket, 'AF_UNIX', object()): + self.assertIn(conn.type, types_, msg=conn) + + from psutil._common import conn_tmap + for kind, groups in conn_tmap.items(): + if SUNOS and kind == 'unix': + continue + families, types_ = groups + cons = psutil.net_connections(kind) + self.assertEqual(len(cons), len(set(cons))) + check(cons, families, types_) + + def test_net_io_counters(self): + def check_ntuple(nt): + self.assertEqual(nt[0], nt.bytes_sent) + self.assertEqual(nt[1], nt.bytes_recv) + self.assertEqual(nt[2], nt.packets_sent) + self.assertEqual(nt[3], nt.packets_recv) + self.assertEqual(nt[4], nt.errin) + self.assertEqual(nt[5], nt.errout) + self.assertEqual(nt[6], nt.dropin) + self.assertEqual(nt[7], nt.dropout) + assert nt.bytes_sent >= 0, nt + assert nt.bytes_recv >= 0, nt + assert nt.packets_sent >= 0, nt + assert nt.packets_recv >= 0, nt + assert nt.errin >= 0, nt + assert nt.errout >= 0, nt + assert nt.dropin >= 0, nt + assert nt.dropout >= 0, nt + + ret = psutil.net_io_counters(pernic=False) + check_ntuple(ret) + ret = psutil.net_io_counters(pernic=True) + self.assertNotEqual(ret, []) + for key in ret: + self.assertTrue(key) + check_ntuple(ret[key]) + + def test_net_if_addrs(self): + nics = psutil.net_if_addrs() + assert nics, nics + + # Not reliable on all platforms (net_if_addrs() reports more + # interfaces). + # self.assertEqual(sorted(nics.keys()), + # sorted(psutil.net_io_counters(pernic=True).keys())) + + families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) + for nic, addrs in nics.items(): + self.assertEqual(len(set(addrs)), len(addrs)) + for addr in addrs: + self.assertIsInstance(addr.family, int) + self.assertIsInstance(addr.address, str) + self.assertIsInstance(addr.netmask, (str, type(None))) + self.assertIsInstance(addr.broadcast, (str, type(None))) + self.assertIn(addr.family, families) + if sys.version_info >= (3, 4): + self.assertIsInstance(addr.family, enum.IntEnum) + if addr.family == socket.AF_INET: + s = socket.socket(addr.family) + with contextlib.closing(s): + s.bind((addr.address, 0)) + elif addr.family == socket.AF_INET6: + info = socket.getaddrinfo( + addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, + 0, socket.AI_PASSIVE)[0] + af, socktype, proto, canonname, sa = info + s = socket.socket(af, socktype, proto) + with contextlib.closing(s): + s.bind(sa) + for ip in (addr.address, addr.netmask, addr.broadcast, + addr.ptp): + if ip is not None: + # TODO: skip AF_INET6 for now because I get: + # AddressValueError: Only hex digits permitted in + # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' + if addr.family != AF_INET6: + check_ip_address(ip, addr.family) + # broadcast and ptp addresses are mutually exclusive + if addr.broadcast: + self.assertIsNone(addr.ptp) + elif addr.ptp: + self.assertIsNone(addr.broadcast) + + if BSD or OSX or SUNOS: + if hasattr(socket, "AF_LINK"): + self.assertEqual(psutil.AF_LINK, socket.AF_LINK) + elif LINUX: + self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) + elif WINDOWS: + self.assertEqual(psutil.AF_LINK, -1) + + @unittest.skipIf(TRAVIS, "EPERM on travis") + def test_net_if_stats(self): + nics = psutil.net_if_stats() + assert nics, nics + all_duplexes = (psutil.NIC_DUPLEX_FULL, + psutil.NIC_DUPLEX_HALF, + psutil.NIC_DUPLEX_UNKNOWN) + for nic, stats in nics.items(): + isup, duplex, speed, mtu = stats + self.assertIsInstance(isup, bool) + self.assertIn(duplex, all_duplexes) + self.assertIn(duplex, all_duplexes) + self.assertGreaterEqual(speed, 0) + self.assertGreaterEqual(mtu, 0) + + @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), + '/proc/diskstats not available on this linux version') + @unittest.skipIf(APPVEYOR, + "can't find any physical disk on Appveyor") + def test_disk_io_counters(self): + def check_ntuple(nt): + self.assertEqual(nt[0], nt.read_count) + self.assertEqual(nt[1], nt.write_count) + self.assertEqual(nt[2], nt.read_bytes) + self.assertEqual(nt[3], nt.write_bytes) + self.assertEqual(nt[4], nt.read_time) + self.assertEqual(nt[5], nt.write_time) + assert nt.read_count >= 0, nt + assert nt.write_count >= 0, nt + assert nt.read_bytes >= 0, nt + assert nt.write_bytes >= 0, nt + assert nt.read_time >= 0, nt + assert nt.write_time >= 0, nt + + ret = psutil.disk_io_counters(perdisk=False) + check_ntuple(ret) + ret = psutil.disk_io_counters(perdisk=True) + # make sure there are no duplicates + self.assertEqual(len(ret), len(set(ret))) + for key in ret: + assert key, key + check_ntuple(ret[key]) + if LINUX and key[-1].isdigit(): + # if 'sda1' is listed 'sda' shouldn't, see: + # https://github.com/giampaolo/psutil/issues/338 + while key[-1].isdigit(): + key = key[:-1] + self.assertNotIn(key, ret.keys()) + + def test_users(self): + users = psutil.users() + if not APPVEYOR: + self.assertNotEqual(users, []) + for user in users: + assert user.name, user + user.terminal + user.host + assert user.started > 0.0, user + datetime.datetime.fromtimestamp(user.started) + + +if __name__ == '__main__': + test_module_by_name(__file__) diff --git a/test/_windows.py b/psutil/tests/test_windows.py similarity index 96% rename from test/_windows.py rename to psutil/tests/test_windows.py index ce654881b..3bd3aff25 100644 --- a/test/_windows.py +++ b/psutil/tests/test_windows.py @@ -5,7 +5,7 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -"""Windows specific tests. These are implicitly run by test_psutil.py.""" +"""Windows specific tests.""" import errno import glob @@ -17,8 +17,6 @@ import time import traceback -import mock - try: import wmi except ImportError: @@ -34,11 +32,13 @@ from psutil._compat import callable from psutil._compat import long from psutil._compat import PY3 -from test_psutil import APPVEYOR -from test_psutil import get_test_subprocess -from test_psutil import reap_children -from test_psutil import retry_before_failing -from test_psutil import unittest +from psutil.tests import APPVEYOR +from psutil.tests import get_test_subprocess +from psutil.tests import reap_children +from psutil.tests import retry_before_failing +from psutil.tests import test_module_by_name +from psutil.tests import unittest +from psutil.tests import mock cext = psutil._psplatform.cext @@ -488,6 +488,7 @@ def test_zombies(self): self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) +@unittest.skipUnless(WINDOWS, "not a Windows system") class RemoteProcessTestCase(unittest.TestCase): """Certain functions require calling ReadProcessMemory. This trivially works when called on the current process. Check that this works on other @@ -575,15 +576,5 @@ def test_environ_64(self): self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) - test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) - test_suite.addTest(unittest.makeSuite(RemoteProcessTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - - if __name__ == '__main__': - if not main(): - sys.exit(1) + test_module_by_name(__file__) diff --git a/setup.py b/setup.py index 35582c194..b0b8eb6dd 100644 --- a/setup.py +++ b/setup.py @@ -229,7 +229,7 @@ def main(): url='https://github.com/giampaolo/psutil', platforms='Platform Independent', license='BSD', - packages=['psutil'], + packages=['psutil', 'psutil.tests'], # see: python setup.py register --list-classifiers classifiers=[ 'Development Status :: 5 - Production/Stable', diff --git a/test/README.rst b/test/README.rst deleted file mode 100644 index 3f2a468ef..000000000 --- a/test/README.rst +++ /dev/null @@ -1,21 +0,0 @@ -- The recommended way to run tests (also on Windows) is to cd into parent - directory and run ``make test`` - -- Dependencies for running tests: - - python 2.6: ipaddress, mock, unittest2 - - python 2.7: ipaddress, mock - - python 3.2: ipaddress, mock - - python 3.3: ipaddress - - python >= 3.4: no deps required - -- The main test script is ``test_psutil.py``, which also imports platform-specific - ``_*.py`` scripts (which should be ignored). - -- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must - be run separately with ``make test-memleaks``. - -- To run tests on all supported Python version install tox (pip install tox) - then run ``tox``. - -- Every time a commit is pushed tests are automatically run on Travis: - https://travis-ci.org/giampaolo/psutil/