From 1a6340785242730cfc87fedb31700cb47f9c531c Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Thu, 19 Dec 2024 14:17:36 +0100 Subject: [PATCH] use a `set` literal when testing for membership --- psutil/__init__.py | 2 +- psutil/_common.py | 8 ++++---- psutil/_compat.py | 2 +- psutil/_psbsd.py | 2 +- psutil/_pslinux.py | 8 ++++---- psutil/_pssunos.py | 6 +++--- psutil/_pswindows.py | 24 ++++++++++++------------ psutil/tests/__init__.py | 18 +++++++++--------- psutil/tests/test_connections.py | 14 +++++++------- psutil/tests/test_linux.py | 2 +- psutil/tests/test_misc.py | 4 ++-- psutil/tests/test_posix.py | 2 +- psutil/tests/test_process.py | 12 ++++++------ psutil/tests/test_process_all.py | 4 ++-- psutil/tests/test_system.py | 8 ++++---- psutil/tests/test_windows.py | 4 ++-- pyproject.toml | 1 - scripts/top.py | 2 +- 18 files changed, 61 insertions(+), 62 deletions(-) diff --git a/psutil/__init__.py b/psutil/__init__.py index 96aa195d2..564833938 100644 --- a/psutil/__init__.py +++ b/psutil/__init__.py @@ -414,7 +414,7 @@ def __str__(self): except AccessDenied: pass - if self._exitcode not in (_SENTINEL, None): + if self._exitcode not in {_SENTINEL, None}: info["exitcode"] = self._exitcode if self._create_time is not None: info['started'] = _pprint_secs(self._create_time) diff --git a/psutil/_common.py b/psutil/_common.py index 9fd7b0cfb..4b99b093e 100644 --- a/psutil/_common.py +++ b/psutil/_common.py @@ -547,7 +547,7 @@ def isfile_strict(path): try: st = os.stat(path) except OSError as err: - if err.errno in (errno.EPERM, errno.EACCES): + if err.errno in {errno.EPERM, errno.EACCES}: raise return False else: @@ -562,7 +562,7 @@ def path_exists_strict(path): try: os.stat(path) except OSError as err: - if err.errno in (errno.EPERM, errno.EACCES): + if err.errno in {errno.EPERM, errno.EACCES}: raise return False else: @@ -639,12 +639,12 @@ def socktype_to_enum(num): def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None): """Convert a raw connection tuple to a proper ntuple.""" - if fam in (socket.AF_INET, AF_INET6): + if fam in {socket.AF_INET, AF_INET6}: if laddr: laddr = addr(*laddr) if raddr: raddr = addr(*raddr) - if type_ == socket.SOCK_STREAM and fam in (AF_INET, AF_INET6): + if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}: status = status_map.get(status, CONN_NONE) else: status = CONN_NONE # ignore whatever C returned to us diff --git a/psutil/_compat.py b/psutil/_compat.py index 9631a7c50..92e01713c 100644 --- a/psutil/_compat.py +++ b/psutil/_compat.py @@ -180,7 +180,7 @@ def ProcessLookupError(inst): @_instance_checking_exception(EnvironmentError) def PermissionError(inst): - return getattr(inst, 'errno', _SENTINEL) in (errno.EACCES, errno.EPERM) + return getattr(inst, 'errno', _SENTINEL) in {errno.EACCES, errno.EPERM} @_instance_checking_exception(EnvironmentError) def InterruptedError(inst): diff --git a/psutil/_psbsd.py b/psutil/_psbsd.py index deffe50b0..77b99c0fd 100644 --- a/psutil/_psbsd.py +++ b/psutil/_psbsd.py @@ -958,7 +958,7 @@ def cpu_affinity_set(self, cpus): # <> - if err.errno in (errno.EINVAL, errno.EDEADLK): + if err.errno in {errno.EINVAL, errno.EDEADLK}: for cpu in cpus: if cpu not in allcpus: raise ValueError( diff --git a/psutil/_pslinux.py b/psutil/_pslinux.py index e1e9bb076..dd2a76bae 100644 --- a/psutil/_pslinux.py +++ b/psutil/_pslinux.py @@ -1040,7 +1040,7 @@ def retrieve(self, kind, pid=None): ret = set() for proto_name, family, type_ in self.tmap[kind]: path = "%s/net/%s" % (self._procfs_path, proto_name) - if family in (socket.AF_INET, socket.AF_INET6): + if family in {socket.AF_INET, socket.AF_INET6}: ls = self.process_inet( path, family, type_, inodes, filter_pid=pid ) @@ -1358,7 +1358,7 @@ def disk_partitions(all=False): device, mountpoint, fstype, opts = partition if device == 'none': device = '' - if device in ("/dev/root", "rootfs"): + if device in {"/dev/root", "rootfs"}: device = RootFsDeviceFinder().find() or device if not all: if not device or fstype not in fstypes: @@ -1589,7 +1589,7 @@ def multi_bcat(*paths): status = cat(root + "/status", fallback="").strip().lower() if status == "discharging": power_plugged = False - elif status in ("charging", "full"): + elif status in {"charging", "full"}: power_plugged = True # Seconds left. @@ -2256,7 +2256,7 @@ def ionice_get(self): def ionice_set(self, ioclass, value): if value is None: value = 0 - if value and ioclass in (IOPRIO_CLASS_IDLE, IOPRIO_CLASS_NONE): + if value and ioclass in {IOPRIO_CLASS_IDLE, IOPRIO_CLASS_NONE}: raise ValueError("%r ioclass accepts no value" % ioclass) if value < 0 or value > 7: msg = "value not in 0-7 range" diff --git a/psutil/_pssunos.py b/psutil/_pssunos.py index 5536d3507..a38d939d7 100644 --- a/psutil/_pssunos.py +++ b/psutil/_pssunos.py @@ -283,7 +283,7 @@ def net_connections(kind, _pid=-1): if type_ not in types: continue # TODO: refactor and use _common.conn_to_ntuple. - if fam in (AF_INET, AF_INET6): + if fam in {AF_INET, AF_INET6}: if laddr: laddr = _common.addr(*laddr) if raddr: @@ -476,7 +476,7 @@ def nice_get(self): @wrap_exceptions def nice_set(self, value): - if self.pid in (2, 3): + if self.pid in {2, 3}: # Special case PIDs: internally setpriority(3) return ESRCH # (no such process), no matter what. # The process actually exists though, as it has a name, @@ -678,7 +678,7 @@ def net_connections(self, kind='inet'): os.stat('%s/%s' % (self._procfs_path, self.pid)) # UNIX sockets - if kind in ('all', 'unix'): + if kind in {'all', 'unix'}: ret.extend([ _common.pconn(*conn) for conn in self._get_unix_sockets(self.pid) diff --git a/psutil/_pswindows.py b/psutil/_pswindows.py index e4550c391..dfadbf493 100644 --- a/psutil/_pswindows.py +++ b/psutil/_pswindows.py @@ -575,10 +575,10 @@ def _wrap_exceptions(self): % self._name ) raise AccessDenied(pid=None, name=self._name, msg=msg) - elif err.winerror in ( + elif err.winerror in { cext.ERROR_INVALID_NAME, cext.ERROR_SERVICE_DOES_NOT_EXIST, - ): + }: msg = "service %r does not exist" % self._name raise NoSuchProcess(pid=None, name=self._name, msg=msg) else: @@ -697,15 +697,15 @@ def as_dict(self): def is_permission_err(exc): """Return True if this is a permission error.""" assert isinstance(exc, OSError), exc - if exc.errno in (errno.EPERM, errno.EACCES): + if exc.errno in {errno.EPERM, errno.EACCES}: return True # On Python 2 OSError doesn't always have 'winerror'. Sometimes # it does, in which case the original exception was WindowsError # (which is a subclass of OSError). - return getattr(exc, "winerror", -1) in ( + return getattr(exc, "winerror", -1) in { cext.ERROR_ACCESS_DENIED, cext.ERROR_PRIVILEGE_NOT_HELD, - ) + } def convert_oserror(exc, pid=None, name=None): @@ -919,10 +919,10 @@ def send_signal(self, sig): if sig == signal.SIGTERM: cext.proc_kill(self.pid) # py >= 2.7 - elif sig in ( + elif sig in { getattr(signal, "CTRL_C_EVENT", object()), getattr(signal, "CTRL_BREAK_EVENT", object()), - ): + }: os.kill(self.pid, sig) else: msg = ( @@ -976,7 +976,7 @@ def wait(self, timeout=None): @wrap_exceptions def username(self): - if self.pid in (0, 4): + if self.pid in {0, 4}: return 'NT AUTHORITY\\SYSTEM' domain, user = cext.proc_username(self.pid) return py2_strencode(domain) + '\\' + py2_strencode(user) @@ -1034,7 +1034,7 @@ def resume(self): @wrap_exceptions @retry_error_partial_copy def cwd(self): - if self.pid in (0, 4): + if self.pid in {0, 4}: raise AccessDenied(self.pid, self._name) # return a normalized pathname since the native C function appends # "\\" at the and of the path @@ -1043,7 +1043,7 @@ def cwd(self): @wrap_exceptions def open_files(self): - if self.pid in (0, 4): + if self.pid in {0, 4}: return [] ret = set() # Filenames come in in native format like: @@ -1087,12 +1087,12 @@ def ionice_set(self, ioclass, value): if value: msg = "value argument not accepted on Windows" raise TypeError(msg) - if ioclass not in ( + if ioclass not in { IOPRIO_VERYLOW, IOPRIO_LOW, IOPRIO_NORMAL, IOPRIO_HIGH, - ): + }: raise ValueError("%s is not a valid priority" % ioclass) cext.proc_io_priority_set(self.pid, ioclass) diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index dd3d53647..f2cceeac5 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -205,7 +205,7 @@ def macos_version(): INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape') else: INVALID_UNICODE_SUFFIX = "f\xc0\x80" -ASCII_FS = sys.getfilesystemencoding().lower() in ('ascii', 'us-ascii') +ASCII_FS = sys.getfilesystemencoding().lower() in {'ascii', 'us-ascii'} # --- paths @@ -1739,11 +1739,11 @@ def get_free_port(host='127.0.0.1'): def bind_socket(family=AF_INET, type=SOCK_STREAM, addr=None): """Binds a generic socket.""" - if addr is None and family in (AF_INET, AF_INET6): + if addr is None and family in {AF_INET, AF_INET6}: addr = ("", 0) sock = socket.socket(family, type) try: - if os.name not in ('nt', 'cygwin'): + if os.name not in {'nt', 'cygwin'}: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(addr) if type == socket.SOCK_STREAM: @@ -1874,7 +1874,7 @@ def check_connection_ntuple(conn): def check_ntuple(conn): has_pid = len(conn) == 7 - assert len(conn) in (6, 7), len(conn) + assert len(conn) in {6, 7}, len(conn) assert conn[0] == conn.fd, conn.fd assert conn[1] == conn.family, conn.family assert conn[2] == conn.type, conn.type @@ -1885,7 +1885,7 @@ def check_ntuple(conn): assert conn[6] == conn.pid, conn.pid def check_family(conn): - assert conn.family in (AF_INET, AF_INET6, AF_UNIX), conn.family + assert conn.family in {AF_INET, AF_INET6, AF_UNIX}, conn.family if enum is not None: assert isinstance(conn.family, enum.IntEnum), conn else: @@ -1908,11 +1908,11 @@ def check_family(conn): def check_type(conn): # SOCK_SEQPACKET may happen in case of AF_UNIX socks SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) - assert conn.type in ( + assert conn.type in { socket.SOCK_STREAM, socket.SOCK_DGRAM, SOCK_SEQPACKET, - ), conn.type + }, conn.type if enum is not None: assert isinstance(conn.type, enum.IntEnum), conn else: @@ -1923,7 +1923,7 @@ def check_type(conn): def check_addrs(conn): # check IP address and port sanity for addr in (conn.laddr, conn.raddr): - if conn.family in (AF_INET, AF_INET6): + if conn.family in {AF_INET, AF_INET6}: assert isinstance(addr, tuple), type(addr) if not addr: continue @@ -1939,7 +1939,7 @@ def check_status(conn): getattr(psutil, x) for x in dir(psutil) if x.startswith('CONN_') ] assert conn.status in valids, conn.status - if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM: + if conn.family in {AF_INET, AF_INET6} and conn.type == SOCK_STREAM: assert conn.status != psutil.CONN_NONE, conn.status else: assert conn.status == psutil.CONN_NONE, conn.status diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index c9256a17f..bca12ff4b 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -49,7 +49,7 @@ def this_proc_net_connections(kind): cons = psutil.Process().net_connections(kind=kind) - if kind in ("all", "unix"): + if kind in {"all", "unix"}: return filter_proc_net_connections(cons) return cons @@ -430,7 +430,7 @@ def test_count(self): cons = this_proc_net_connections(kind='tcp') assert len(cons) == (2 if supports_ipv6() else 1) for conn in cons: - assert conn.family in (AF_INET, AF_INET6) + assert conn.family in {AF_INET, AF_INET6} assert conn.type == SOCK_STREAM # tcp4 cons = this_proc_net_connections(kind='tcp4') @@ -447,7 +447,7 @@ def test_count(self): cons = this_proc_net_connections(kind='udp') assert len(cons) == (2 if supports_ipv6() else 1) for conn in cons: - assert conn.family in (AF_INET, AF_INET6) + assert conn.family in {AF_INET, AF_INET6} assert conn.type == SOCK_DGRAM # udp4 cons = this_proc_net_connections(kind='udp4') @@ -464,15 +464,15 @@ def test_count(self): cons = this_proc_net_connections(kind='inet') assert len(cons) == (4 if supports_ipv6() else 2) for conn in cons: - assert conn.family in (AF_INET, AF_INET6) - assert conn.type in (SOCK_STREAM, SOCK_DGRAM) + assert conn.family in {AF_INET, AF_INET6} + assert conn.type in {SOCK_STREAM, SOCK_DGRAM} # inet6 if supports_ipv6(): cons = this_proc_net_connections(kind='inet6') assert len(cons) == 2 for conn in cons: assert conn.family == AF_INET6 - assert conn.type in (SOCK_STREAM, SOCK_DGRAM) + assert conn.type in {SOCK_STREAM, SOCK_DGRAM} # Skipped on BSD becayse by default the Python process # creates a UNIX socket to '/var/run/log'. if HAS_NET_CONNECTIONS_UNIX and not (FREEBSD or NETBSD): @@ -480,7 +480,7 @@ def test_count(self): assert len(cons) == 3 for conn in cons: assert conn.family == AF_UNIX - assert conn.type in (SOCK_STREAM, SOCK_DGRAM) + assert conn.type in {SOCK_STREAM, SOCK_DGRAM} @pytest.mark.skipif(SKIP_SYSCONS, reason="requires root") diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index ea9dff952..15eaf5e2e 100755 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -1556,7 +1556,7 @@ def test_users(self): # Make sure the C extension converts ':0' and ':0.0' to # 'localhost'. for user in psutil.users(): - assert user.host not in (":0", ":0.0") + assert user.host not in {":0", ":0.0"} def test_procfs_path(self): tdir = self.get_testfn() diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py index ca189a97b..cf98f8b4b 100755 --- a/psutil/tests/test_misc.py +++ b/psutil/tests/test_misc.py @@ -215,14 +215,14 @@ class TestMisc(PsutilTestCase): def test__all__(self): dir_psutil = dir(psutil) for name in dir_psutil: - if name in ( + if name in { 'debug', 'long', 'tests', 'test', 'PermissionError', 'ProcessLookupError', - ): + }: continue if not name.startswith('_'): try: diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 6f7f9790f..12afa6192 100755 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -280,7 +280,7 @@ def test_create_time(self): round_time_psutil_tstamp = datetime.datetime.fromtimestamp( round_time_psutil ).strftime("%H:%M:%S") - assert time_ps in [time_psutil_tstamp, round_time_psutil_tstamp] + assert time_ps in {time_psutil_tstamp, round_time_psutil_tstamp} def test_exe(self): ps_pathname = ps_name(self.pid) diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index dec62baf7..bdf860a10 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -687,7 +687,7 @@ def test_memory_maps(self): value = getattr(nt, fname) if fname == 'path': continue - if fname in ('addr', 'perms'): + if fname in {'addr', 'perms'}: assert value, value else: assert isinstance(value, (int, long)) @@ -923,11 +923,11 @@ def cleanup(init): # even if the function succeeds. For higher # priorities, we match either the expected # value or the highest so far. - if prio in ( + if prio in { psutil.ABOVE_NORMAL_PRIORITY_CLASS, psutil.HIGH_PRIORITY_CLASS, psutil.REALTIME_PRIORITY_CLASS, - ): + }: if new_prio == prio or highest_prio is None: highest_prio = prio assert new_prio == highest_prio @@ -1390,12 +1390,12 @@ def assert_raises_nsp(fun, fun_name): except psutil.NoSuchProcess: pass except psutil.AccessDenied: - if OPENBSD and fun_name in ('threads', 'num_threads'): + if OPENBSD and fun_name in {'threads', 'num_threads'}: return raise else: # NtQuerySystemInformation succeeds even if process is gone. - if WINDOWS and fun_name in ('exe', 'name'): + if WINDOWS and fun_name in {'exe', 'name'}: return raise self.fail( "%r didn't raise NSP and returned %r instead" % (fun, ret) @@ -1520,7 +1520,7 @@ def test_pid_0(self): except psutil.AccessDenied: pass else: - if name in ("uids", "gids"): + if name in {"uids", "gids"}: assert ret.real == 0 elif name == "username": user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root' diff --git a/psutil/tests/test_process_all.py b/psutil/tests/test_process_all.py index 1550046ba..780ea194b 100755 --- a/psutil/tests/test_process_all.py +++ b/psutil/tests/test_process_all.py @@ -321,7 +321,7 @@ def memory_full_info(self, ret, info): value = getattr(ret, name) assert isinstance(value, (int, long)) assert value >= 0 - if LINUX or (OSX and name in ('vms', 'data')): + if LINUX or (OSX and name in {'vms', 'data'}): # On Linux there are processes (e.g. 'goa-daemon') whose # VMS is incredibly high for some reason. continue @@ -343,7 +343,7 @@ def open_files(self, ret, info): assert isinstance(f.mode, str) assert isinstance(f.flags, int) assert f.position >= 0 - assert f.mode in ('r', 'w', 'a', 'r+', 'a+') + assert f.mode in {'r', 'w', 'a', 'r+', 'a+'} assert f.flags > 0 elif BSD and not f.path: # XXX see: https://github.com/giampaolo/psutil/issues/595 diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index 68074ae14..e4dfe79d6 100755 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -589,7 +589,7 @@ def test_cpu_stats(self): value = getattr(infos, name) assert value >= 0 # on AIX, ctx_switches is always 0 - if not AIX and name in ('ctx_switches', 'interrupts'): + if not AIX and name in {'ctx_switches', 'interrupts'}: assert value > 0 # TODO: remove this once 1892 is fixed @@ -703,7 +703,7 @@ def check_ntuple(nt): continue # http://mail.python.org/pipermail/python-dev/ # 2012-June/120787.html - if err.errno not in (errno.EPERM, errno.EACCES): + if err.errno not in {errno.EPERM, errno.EACCES}: raise else: assert os.path.exists(disk.mountpoint), disk @@ -965,10 +965,10 @@ def test_sensors_battery(self): ret = psutil.sensors_battery() assert ret.percent >= 0 assert ret.percent <= 100 - if ret.secsleft not in ( + if ret.secsleft not in { psutil.POWER_TIME_UNKNOWN, psutil.POWER_TIME_UNLIMITED, - ): + }: assert ret.secsleft >= 0 else: if ret.secsleft == psutil.POWER_TIME_UNLIMITED: diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py index b11b0e706..161e2f35d 100755 --- a/psutil/tests/test_windows.py +++ b/psutil/tests/test_windows.py @@ -401,7 +401,7 @@ def test_special_pid(self): rss, _vms = p.memory_info()[:2] except psutil.AccessDenied: # expected on Windows Vista and Windows 7 - if platform.uname()[1] not in ('vista', 'win-7', 'win7'): + if platform.uname()[1] not in {'vista', 'win-7', 'win7'}: raise else: assert rss > 0 @@ -637,7 +637,7 @@ def test_memory_vms(self): # bytes but funnily enough on certain platforms bytes are # returned instead. wmi_usage = int(w.PageFileUsage) - if vms not in (wmi_usage, wmi_usage * 1024): + if vms not in {wmi_usage, wmi_usage * 1024}: raise self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) def test_create_time(self): diff --git a/pyproject.toml b/pyproject.toml index 18bfbfa28..429c4a788 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,6 @@ ignore = [ "PLR1704", # Redefining argument with the local name `type_` "PLR2004", # Magic value used in comparison, consider replacing X with a constant variable "PLR5501", # Use `elif` instead of `else` then `if`, to reduce indentation - "PLR6201", # Use a `set` literal when testing for membership "PLR6301", # Method `x` could be a function, class method, or static method "PLW0603", # Using the global statement to update `lineno` is discouraged "PLW1514", # `open` in text mode without explicit `encoding` argument diff --git a/scripts/top.py b/scripts/top.py index c0687ae1f..db206e896 100755 --- a/scripts/top.py +++ b/scripts/top.py @@ -161,7 +161,7 @@ def get_dashes(perc): for x, y in procs_status.items(): if y: st.append("%s=%s" % (x, y)) - st.sort(key=lambda x: x[:3] in ('run', 'sle'), reverse=1) + st.sort(key=lambda x: x[:3] in {'run', 'sle'}, reverse=1) printl(" Processes: %s (%s)" % (num_procs, ', '.join(st))) # load average, uptime uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(