增加JAVA环境配置
增加VC运行库安装
增加检测升级功能
更新PY模块
This commit is contained in:
2022-01-05 03:12:34 +08:00
parent f3ce17779b
commit 3ece9fa803
55 changed files with 827 additions and 1776 deletions
+90 -73
View File
@@ -21,6 +21,7 @@ Works with Python versions from 2.6 to 3.4+.
"""
from __future__ import division
import collections
import contextlib
import datetime
@@ -31,24 +32,16 @@ import subprocess
import sys
import threading
import time
try:
import pwd
except ImportError:
pwd = None
from . import _common
from ._common import AccessDenied
from ._common import Error
from ._common import memoize_when_activated
from ._common import NoSuchProcess
from ._common import TimeoutExpired
from ._common import wrap_numbers as _wrap_numbers
from ._common import ZombieProcess
from ._compat import long
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import PY3 as _PY3
from ._common import AIX
from ._common import BSD
from ._common import CONN_CLOSE
from ._common import CONN_CLOSE_WAIT
from ._common import CONN_CLOSING
@@ -61,9 +54,16 @@ from ._common import CONN_NONE
from ._common import CONN_SYN_RECV
from ._common import CONN_SYN_SENT
from ._common import CONN_TIME_WAIT
from ._common import FREEBSD # NOQA
from ._common import LINUX
from ._common import MACOS
from ._common import NETBSD # NOQA
from ._common import NIC_DUPLEX_FULL
from ._common import NIC_DUPLEX_HALF
from ._common import NIC_DUPLEX_UNKNOWN
from ._common import OPENBSD # NOQA
from ._common import OSX # deprecated alias
from ._common import POSIX # NOQA
from ._common import POWER_TIME_UNKNOWN
from ._common import POWER_TIME_UNLIMITED
from ._common import STATUS_DEAD
@@ -78,18 +78,21 @@ from ._common import STATUS_TRACING_STOP
from ._common import STATUS_WAITING
from ._common import STATUS_WAKING
from ._common import STATUS_ZOMBIE
from ._common import AIX
from ._common import BSD
from ._common import FREEBSD # NOQA
from ._common import LINUX
from ._common import MACOS
from ._common import NETBSD # NOQA
from ._common import OPENBSD # NOQA
from ._common import OSX # deprecated alias
from ._common import POSIX # NOQA
from ._common import SUNOS
from ._common import WINDOWS
from ._common import AccessDenied
from ._common import Error
from ._common import NoSuchProcess
from ._common import TimeoutExpired
from ._common import ZombieProcess
from ._common import memoize_when_activated
from ._common import wrap_numbers as _wrap_numbers
from ._compat import PY3 as _PY3
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import SubprocessTimeoutExpired as _SubprocessTimeoutExpired
from ._compat import long
if LINUX:
# This is public API and it will be retrieved from _pslinux.py
@@ -97,7 +100,6 @@ if LINUX:
PROCFS_PATH = "/proc"
from . import _pslinux as _psplatform
from ._pslinux import IOPRIO_CLASS_BE # NOQA
from ._pslinux import IOPRIO_CLASS_IDLE # NOQA
from ._pslinux import IOPRIO_CLASS_NONE # NOQA
@@ -112,10 +114,10 @@ elif WINDOWS:
from ._psutil_windows import NORMAL_PRIORITY_CLASS # NOQA
from ._psutil_windows import REALTIME_PRIORITY_CLASS # NOQA
from ._pswindows import CONN_DELETE_TCB # NOQA
from ._pswindows import IOPRIO_VERYLOW # NOQA
from ._pswindows import IOPRIO_HIGH # NOQA
from ._pswindows import IOPRIO_LOW # NOQA
from ._pswindows import IOPRIO_NORMAL # NOQA
from ._pswindows import IOPRIO_HIGH # NOQA
from ._pswindows import IOPRIO_VERYLOW # NOQA
elif MACOS:
from . import _psosx as _psplatform
@@ -209,7 +211,7 @@ if hasattr(_psplatform.Process, "rlimit"):
AF_LINK = _psplatform.AF_LINK
__author__ = "Giampaolo Rodola'"
__version__ = "5.8.0"
__version__ = "5.9.0"
version_info = tuple([int(num) for num in __version__.split('.')])
_timer = getattr(time, 'monotonic', time.time)
@@ -268,7 +270,11 @@ def _assert_pid_not_reused(fun):
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
if not self.is_running():
raise NoSuchProcess(self.pid, self._name)
if self._pid_reused:
msg = "process no longer exists and its PID has been reused"
else:
msg = None
raise NoSuchProcess(self.pid, self._name, msg=msg)
return fun(self, *args, **kwargs)
return wrapper
@@ -339,6 +345,7 @@ class Process(object):
self._exe = None
self._create_time = None
self._gone = False
self._pid_reused = False
self._hash = None
self._lock = threading.RLock()
# used for caching on Windows only (on POSIX ppid may change)
@@ -363,8 +370,7 @@ class Process(object):
pass
except NoSuchProcess:
if not _ignore_nsp:
msg = 'no process found with pid %s' % pid
raise NoSuchProcess(pid, None, msg)
raise NoSuchProcess(pid, msg='process PID not found')
else:
self._gone = True
# This pair is supposed to indentify a Process instance
@@ -570,7 +576,7 @@ class Process(object):
It also checks if PID has been reused by another process in
which case return False.
"""
if self._gone:
if self._gone or self._pid_reused:
return False
try:
# Checking if PID is alive is not enough as the PID might
@@ -578,7 +584,8 @@ class Process(object):
# verify process identity.
# Process identity / uniqueness over time is guaranteed by
# (PID + creation time) and that is verified in __eq__.
return self == Process(self.pid)
self._pid_reused = self != Process(self.pid)
return not self._pid_reused
except ZombieProcess:
# We should never get here as it's already handled in
# Process.__init__; here just for extra safety.
@@ -1386,7 +1393,6 @@ def pid_exists(pid):
_pmap = {}
_lock = threading.Lock()
def process_iter(attrs=None, ad_value=None):
@@ -1410,58 +1416,59 @@ def process_iter(attrs=None, ad_value=None):
If *attrs* is an empty list it will retrieve all process info
(slow).
"""
global _pmap
def add(pid):
proc = Process(pid)
if attrs is not None:
proc.info = proc.as_dict(attrs=attrs, ad_value=ad_value)
with _lock:
_pmap[proc.pid] = proc
pmap[proc.pid] = proc
return proc
def remove(pid):
with _lock:
_pmap.pop(pid, None)
pmap.pop(pid, None)
pmap = _pmap.copy()
a = set(pids())
b = set(_pmap.keys())
b = set(pmap.keys())
new_pids = a - b
gone_pids = b - a
for pid in gone_pids:
remove(pid)
with _lock:
ls = sorted(list(_pmap.items()) +
list(dict.fromkeys(new_pids).items()))
for pid, proc in ls:
try:
if proc is None: # new process
yield add(pid)
else:
# use is_running() to check whether PID has been reused by
# another process in which case yield a new Process instance
if proc.is_running():
if attrs is not None:
proc.info = proc.as_dict(
attrs=attrs, ad_value=ad_value)
yield proc
else:
try:
ls = sorted(list(pmap.items()) + list(dict.fromkeys(new_pids).items()))
for pid, proc in ls:
try:
if proc is None: # new process
yield add(pid)
except NoSuchProcess:
remove(pid)
except AccessDenied:
# Process creation time can't be determined hence there's
# no way to tell whether the pid of the cached process
# has been reused. Just return the cached version.
if proc is None and pid in _pmap:
try:
yield _pmap[pid]
except KeyError:
# If we get here it is likely that 2 threads were
# using process_iter().
pass
else:
raise
else:
# use is_running() to check whether PID has been
# reused by another process in which case yield a
# new Process instance
if proc.is_running():
if attrs is not None:
proc.info = proc.as_dict(
attrs=attrs, ad_value=ad_value)
yield proc
else:
yield add(pid)
except NoSuchProcess:
remove(pid)
except AccessDenied:
# Process creation time can't be determined hence there's
# no way to tell whether the pid of the cached process
# has been reused. Just return the cached version.
if proc is None and pid in pmap:
try:
yield pmap[pid]
except KeyError:
# If we get here it is likely that 2 threads were
# using process_iter().
pass
else:
raise
finally:
_pmap = pmap
def wait_procs(procs, timeout=None, callback=None):
@@ -1505,6 +1512,8 @@ def wait_procs(procs, timeout=None, callback=None):
returncode = proc.wait(timeout=timeout)
except TimeoutExpired:
pass
except _SubprocessTimeoutExpired:
pass
else:
if returncode is not None or not proc.is_running():
# Set new Process instance attribute.
@@ -1575,7 +1584,7 @@ def cpu_count(logical=True):
if logical:
ret = _psplatform.cpu_count_logical()
else:
ret = _psplatform.cpu_count_physical()
ret = _psplatform.cpu_count_cores()
if ret is not None and ret < 1:
ret = None
return ret
@@ -1721,7 +1730,6 @@ def cpu_percent(interval=None, percpu=False):
def calculate(t1, t2):
times_delta = _cpu_times_deltas(t1, t2)
all_delta = _cpu_tot_time(times_delta)
busy_delta = _cpu_busy_time(times_delta)
@@ -1849,7 +1857,7 @@ def cpu_stats():
if hasattr(_psplatform, "cpu_freq"):
def cpu_freq(percpu=False):
"""Return CPU frequency as a nameduple including current,
"""Return CPU frequency as a namedtuple including current,
min and max frequency expressed in Mhz.
If *percpu* is True and the system supports per-cpu frequency
@@ -2338,6 +2346,15 @@ if WINDOWS:
# =====================================================================
def _set_debug(value):
"""Enable or disable PSUTIL_DEBUG option, which prints debugging
messages to stderr.
"""
import psutil._common
psutil._common.PSUTIL_DEBUG = bool(value)
_psplatform.cext.set_debug(bool(value))
def test(): # pragma: no cover
from ._common import bytes2human
from ._compat import get_terminal_size
+57 -55
View File
@@ -7,8 +7,10 @@
# Note: this module is imported by setup.py so it should not import
# psutil or third-party modules.
from __future__ import division, print_function
from __future__ import division
from __future__ import print_function
import collections
import contextlib
import errno
import functools
@@ -18,12 +20,12 @@ import stat
import sys
import threading
import warnings
from collections import defaultdict
from collections import namedtuple
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import SOCK_STREAM
try:
from socket import AF_INET6
except ImportError:
@@ -41,6 +43,7 @@ else:
# can't take it from _common.py as this script is imported by setup.py
PY3 = sys.version_info[0] == 3
PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG', 0))
__all__ = [
# OS constants
@@ -83,7 +86,7 @@ WINDOWS = os.name == "nt"
LINUX = sys.platform.startswith("linux")
MACOS = sys.platform.startswith("darwin")
OSX = MACOS # deprecated alias
FREEBSD = sys.platform.startswith("freebsd")
FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
OPENBSD = sys.platform.startswith("openbsd")
NETBSD = sys.platform.startswith("netbsd")
BSD = FREEBSD or OPENBSD or NETBSD
@@ -275,15 +278,32 @@ class Error(Exception):
"""
__module__ = 'psutil'
def __init__(self, msg=""):
Exception.__init__(self, msg)
self.msg = msg
def _infodict(self, attrs):
try:
info = collections.OrderedDict()
except AttributeError: # pragma: no cover
info = {} # Python 2.6
for name in attrs:
value = getattr(self, name, None)
if value:
info[name] = value
return info
def __str__(self):
# invoked on `raise Error`
info = self._infodict(("pid", "ppid", "name"))
if info:
details = "(%s)" % ", ".join(
["%s=%r" % (k, v) for k, v in info.items()])
else:
details = None
return " ".join([x for x in (self.msg, details) if x])
def __repr__(self):
ret = "psutil.%s %s" % (self.__class__.__name__, self.msg)
return ret.strip()
__str__ = __repr__
# invoked on `repr(Error)`
info = self._infodict(("pid", "ppid", "name", "seconds", "msg"))
details = ", ".join(["%s=%r" % (k, v) for k, v in info.items()])
return "psutil.%s(%s)" % (self.__class__.__name__, details)
class NoSuchProcess(Error):
@@ -293,16 +313,10 @@ class NoSuchProcess(Error):
__module__ = 'psutil'
def __init__(self, pid, name=None, msg=None):
Error.__init__(self, msg)
Error.__init__(self)
self.pid = pid
self.name = name
self.msg = msg
if msg is None:
if name:
details = "(pid=%s, name=%s)" % (self.pid, repr(self.name))
else:
details = "(pid=%s)" % self.pid
self.msg = "process no longer exists " + details
self.msg = msg or "process no longer exists"
class ZombieProcess(NoSuchProcess):
@@ -315,19 +329,9 @@ class ZombieProcess(NoSuchProcess):
__module__ = 'psutil'
def __init__(self, pid, name=None, ppid=None, msg=None):
NoSuchProcess.__init__(self, msg)
self.pid = pid
NoSuchProcess.__init__(self, pid, name, msg)
self.ppid = ppid
self.name = name
self.msg = msg
if msg is None:
args = ["pid=%s" % pid]
if name:
args.append("name=%s" % repr(self.name))
if ppid:
args.append("ppid=%s" % self.ppid)
details = "(%s)" % ", ".join(args)
self.msg = "process still exists but it's a zombie " + details
self.msg = msg or "PID still exists but it's a zombie"
class AccessDenied(Error):
@@ -335,17 +339,10 @@ class AccessDenied(Error):
__module__ = 'psutil'
def __init__(self, pid=None, name=None, msg=None):
Error.__init__(self, msg)
Error.__init__(self)
self.pid = pid
self.name = name
self.msg = msg
if msg is None:
if (pid is not None) and (name is not None):
self.msg = "(pid=%s, name=%s)" % (pid, repr(name))
elif (pid is not None):
self.msg = "(pid=%s)" % self.pid
else:
self.msg = ""
self.msg = msg or ""
class TimeoutExpired(Error):
@@ -355,14 +352,11 @@ class TimeoutExpired(Error):
__module__ = 'psutil'
def __init__(self, seconds, pid=None, name=None):
Error.__init__(self, "timeout after %s seconds" % seconds)
Error.__init__(self)
self.seconds = seconds
self.pid = pid
self.name = name
if (pid is not None) and (name is not None):
self.msg += " (pid=%s, name=%s)" % (pid, repr(name))
elif (pid is not None):
self.msg += " (pid=%s)" % self.pid
self.msg = "timeout after %s seconds" % seconds
# ===================================================================
@@ -451,7 +445,13 @@ def memoize_when_activated(fun):
except KeyError:
# case 3: we entered oneshot() ctx but there's no cache
# for this entry yet
ret = self._cache[fun] = fun(self)
ret = fun(self)
try:
self._cache[fun] = ret
except AttributeError:
# multi-threading race condition, see:
# https://github.com/giampaolo/psutil/issues/1948
pass
return ret
def cache_activate(proc):
@@ -622,8 +622,8 @@ class _WrapNumbers:
assert name not in self.reminders
assert name not in self.reminder_keys
self.cache[name] = input_dict
self.reminders[name] = defaultdict(int)
self.reminder_keys[name] = defaultdict(set)
self.reminders[name] = collections.defaultdict(int)
self.reminder_keys[name] = collections.defaultdict(set)
def _remove_dead_reminders(self, input_dict, name):
"""In case the number of keys changed between calls (e.g. a
@@ -832,15 +832,17 @@ def print_color(
SetConsoleTextAttribute(handle, DEFAULT_COLOR)
if bool(os.getenv('PSUTIL_DEBUG', 0)):
import inspect
def debug(msg):
"""If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
def debug(msg):
"""If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
if PSUTIL_DEBUG:
import inspect
fname, lineno, func_name, lines, index = inspect.getframeinfo(
inspect.currentframe().f_back)
if isinstance(msg, Exception):
if isinstance(msg, (OSError, IOError, EnvironmentError)):
# ...because str(exc) may contain info about the file name
msg = "ignoring %s" % msg
else:
msg = "ignoring %r" % msg
print("psutil-debug [%s:%s]> %s" % (fname, lineno, msg), # NOQA
file=sys.stderr)
else:
def debug(msg):
pass
+27 -1
View File
@@ -8,12 +8,14 @@ Python 3 way of doing things).
"""
import collections
import contextlib
import errno
import functools
import os
import sys
import types
__all__ = [
# constants
"PY3",
@@ -25,6 +27,8 @@ __all__ = [
"lru_cache",
# shutil module
"which", "get_terminal_size",
# contextlib module
"redirect_stderr",
# python 3 exceptions
"FileNotFoundError", "PermissionError", "ProcessLookupError",
"InterruptedError", "ChildProcessError", "FileExistsError"]
@@ -410,8 +414,8 @@ except ImportError:
def get_terminal_size(fallback=(80, 24)):
try:
import fcntl
import termios
import struct
import termios
except ImportError:
return fallback
else:
@@ -422,3 +426,25 @@ except ImportError:
return (res[1], res[0])
except Exception:
return fallback
# python 3.3
try:
from subprocess import TimeoutExpired as SubprocessTimeoutExpired
except ImportError:
class SubprocessTimeoutExpired:
pass
# python 3.5
try:
from contextlib import redirect_stderr
except ImportError:
@contextlib.contextmanager
def redirect_stderr(new_target):
original = getattr(sys, "stderr")
try:
setattr(sys, "stderr", new_target)
yield new_target
finally:
setattr(sys, "stderr", original)
+7 -7
View File
@@ -18,20 +18,20 @@ from . import _common
from . import _psposix
from . import _psutil_aix as cext
from . import _psutil_posix as cext_posix
from ._common import AccessDenied
from ._common import conn_to_ntuple
from ._common import get_procfs_path
from ._common import memoize_when_activated
from ._common import NIC_DUPLEX_FULL
from ._common import NIC_DUPLEX_HALF
from ._common import NIC_DUPLEX_UNKNOWN
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import usage_percent
from ._common import ZombieProcess
from ._common import conn_to_ntuple
from ._common import get_procfs_path
from ._common import memoize_when_activated
from ._common import usage_percent
from ._compat import PY3
from ._compat import FileNotFoundError
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import PY3
__extra__all__ = ["PROCFS_PATH"]
@@ -143,7 +143,7 @@ def cpu_count_logical():
return None
def cpu_count_physical():
def cpu_count_cores():
cmd = "lsdev -Cc processor"
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+11 -12
View File
@@ -9,24 +9,24 @@ import errno
import functools
import os
import xml.etree.ElementTree as ET
from collections import namedtuple
from collections import defaultdict
from collections import namedtuple
from . import _common
from . import _psposix
from . import _psutil_bsd as cext
from . import _psutil_posix as cext_posix
from ._common import FREEBSD
from ._common import NETBSD
from ._common import OPENBSD
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import conn_tmap
from ._common import conn_to_ntuple
from ._common import FREEBSD
from ._common import memoize
from ._common import memoize_when_activated
from ._common import NETBSD
from ._common import NoSuchProcess
from ._common import OPENBSD
from ._common import usage_percent
from ._common import ZombieProcess
from ._compat import FileNotFoundError
from ._compat import PermissionError
from ._compat import ProcessLookupError
@@ -249,19 +249,19 @@ def cpu_count_logical():
if OPENBSD or NETBSD:
def cpu_count_physical():
def cpu_count_cores():
# OpenBSD and NetBSD do not implement this.
return 1 if cpu_count_logical() == 1 else None
else:
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_count_phys()
s = cext.cpu_topology()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
@@ -274,8 +274,7 @@ else:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs are 1 it's obvious we'll have only 1
# physical CPU.
# If logical CPUs == 1 it's obvious we' have only 1 core.
if cpu_count_logical() == 1:
return 1
return ret
+129 -40
View File
@@ -25,30 +25,31 @@ from . import _common
from . import _psposix
from . import _psutil_linux as cext
from . import _psutil_posix as cext_posix
from ._common import NIC_DUPLEX_FULL
from ._common import NIC_DUPLEX_HALF
from ._common import NIC_DUPLEX_UNKNOWN
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import debug
from ._common import decode
from ._common import get_procfs_path
from ._common import isfile_strict
from ._common import memoize
from ._common import memoize_when_activated
from ._common import NIC_DUPLEX_FULL
from ._common import NIC_DUPLEX_HALF
from ._common import NIC_DUPLEX_UNKNOWN
from ._common import NoSuchProcess
from ._common import open_binary
from ._common import open_text
from ._common import parse_environ_block
from ._common import path_exists_strict
from ._common import supports_ipv6
from ._common import usage_percent
from ._common import ZombieProcess
from ._compat import b
from ._compat import basestring
from ._compat import PY3
from ._compat import FileNotFoundError
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import PY3
from ._compat import b
from ._compat import basestring
if sys.version_info >= (3, 4):
import enum
@@ -656,8 +657,8 @@ def cpu_count_logical():
return num
def cpu_count_physical():
"""Return the number of physical cores in the system."""
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
# Method #1
ls = set()
# These 2 files are the same but */core_cpus_list is newer while
@@ -719,6 +720,17 @@ def cpu_stats():
ctx_switches, interrupts, soft_interrupts, syscalls)
def _cpu_get_cpuinfo_freq():
"""Return current CPU frequency from cpuinfo if available.
"""
ret = []
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
if line.lower().startswith(b'cpu mhz'):
ret.append(float(line.split(b':', 1)[1]))
return ret
if os.path.exists("/sys/devices/system/cpu/cpufreq/policy0") or \
os.path.exists("/sys/devices/system/cpu/cpu0/cpufreq"):
def cpu_freq():
@@ -726,20 +738,20 @@ if os.path.exists("/sys/devices/system/cpu/cpufreq/policy0") or \
Contrarily to other OSes, Linux updates these values in
real-time.
"""
def get_path(num):
for p in ("/sys/devices/system/cpu/cpufreq/policy%s" % num,
"/sys/devices/system/cpu/cpu%s/cpufreq" % num):
if os.path.exists(p):
return p
cpuinfo_freqs = _cpu_get_cpuinfo_freq()
paths = \
glob.glob("/sys/devices/system/cpu/cpufreq/policy[0-9]*") or \
glob.glob("/sys/devices/system/cpu/cpu[0-9]*/cpufreq")
paths.sort(key=lambda x: int(re.search(r"[0-9]+", x).group()))
ret = []
for n in range(cpu_count_logical()):
path = get_path(n)
if not path:
continue
pjoin = os.path.join
curr = cat(pjoin(path, "scaling_cur_freq"), fallback=None)
pjoin = os.path.join
for i, path in enumerate(paths):
if len(paths) == len(cpuinfo_freqs):
# take cached value from cpuinfo if available, see:
# https://github.com/giampaolo/psutil/issues/1851
curr = cpuinfo_freqs[i]
else:
curr = cat(pjoin(path, "scaling_cur_freq"), fallback=None)
if curr is None:
# Likely an old RedHat, see:
# https://github.com/giampaolo/psutil/issues/1071
@@ -753,24 +765,12 @@ if os.path.exists("/sys/devices/system/cpu/cpufreq/policy0") or \
ret.append(_common.scpufreq(curr, min_, max_))
return ret
elif os.path.exists("/proc/cpuinfo"):
else:
def cpu_freq():
"""Alternate implementation using /proc/cpuinfo.
min and max frequencies are not available and are set to None.
"""
ret = []
with open_binary('%s/cpuinfo' % get_procfs_path()) as f:
for line in f:
if line.lower().startswith(b'cpu mhz'):
key, value = line.split(b':', 1)
ret.append(_common.scpufreq(float(value), 0., 0.))
return ret
else:
def cpu_freq():
"""Dummy implementation when none of the above files are present.
"""
return []
return [_common.scpufreq(x, 0., 0.) for x in _cpu_get_cpuinfo_freq()]
# =====================================================================
@@ -834,6 +834,10 @@ class Connections:
if err.errno == errno.EINVAL:
# not a link
continue
if err.errno == errno.ENAMETOOLONG:
# file name too long
debug(err)
continue
raise
else:
if inode.startswith('socket:['):
@@ -1080,6 +1084,8 @@ def net_if_stats():
# https://github.com/giampaolo/psutil/issues/1279
if err.errno != errno.ENODEV:
raise
else:
debug(err)
else:
ret[name] = _common.snicstats(isup, duplex_map[duplex], speed, mtu)
return ret
@@ -1188,6 +1194,80 @@ def disk_io_counters(perdisk=False):
return retdict
class RootFsDeviceFinder:
"""disk_partitions() may return partitions with device == "/dev/root"
or "rootfs". This container class uses different strategies to try to
obtain the real device path. Resources:
https://bootlin.com/blog/find-root-device/
https://www.systutorials.com/how-to-find-the-disk-where-root-is-on-in-bash-on-linux/
"""
__slots__ = ['major', 'minor']
def __init__(self):
dev = os.stat("/").st_dev
self.major = os.major(dev)
self.minor = os.minor(dev)
def ask_proc_partitions(self):
with open_text("%s/partitions" % get_procfs_path()) as f:
for line in f.readlines()[2:]:
fields = line.split()
if len(fields) < 4: # just for extra safety
continue
major = int(fields[0]) if fields[0].isdigit() else None
minor = int(fields[1]) if fields[1].isdigit() else None
name = fields[3]
if major == self.major and minor == self.minor:
if name: # just for extra safety
return "/dev/%s" % name
def ask_sys_dev_block(self):
path = "/sys/dev/block/%s:%s/uevent" % (self.major, self.minor)
with open_text(path) as f:
for line in f:
if line.startswith("DEVNAME="):
name = line.strip().rpartition("DEVNAME=")[2]
if name: # just for extra safety
return "/dev/%s" % name
def ask_sys_class_block(self):
needle = "%s:%s" % (self.major, self.minor)
files = glob.iglob("/sys/class/block/*/dev")
for file in files:
try:
f = open_text(file)
except FileNotFoundError: # race condition
continue
else:
with f:
data = f.read().strip()
if data == needle:
name = os.path.basename(os.path.dirname(file))
return "/dev/%s" % name
def find(self):
path = None
if path is None:
try:
path = self.ask_proc_partitions()
except (IOError, OSError) as err:
debug(err)
if path is None:
try:
path = self.ask_sys_dev_block()
except (IOError, OSError) as err:
debug(err)
if path is None:
try:
path = self.ask_sys_class_block()
except (IOError, OSError) as err:
debug(err)
# We use exists() because the "/dev/*" part of the path is hard
# coded, so we want to be sure.
if path is not None and os.path.exists(path):
return path
def disk_partitions(all=False):
"""Return mounted disk partitions as a list of namedtuples."""
fstypes = set()
@@ -1215,6 +1295,8 @@ def disk_partitions(all=False):
device, mountpoint, fstype, opts = partition
if device == 'none':
device = ''
if device in ("/dev/root", "rootfs"):
device = RootFsDeviceFinder().find() or device
if not all:
if device == '' or fstype not in fstypes:
continue
@@ -1310,7 +1392,7 @@ def sensors_temperatures():
path = os.path.join(base, 'type')
unit_name = cat(path, binary=False)
except (IOError, OSError, ValueError) as err:
debug("ignoring %r for file %r" % (err, path))
debug(err)
continue
trip_paths = glob.glob(base + '/trip_point*')
@@ -1366,7 +1448,7 @@ def sensors_fans():
try:
current = int(cat(base + '_input'))
except (IOError, OSError) as err:
warnings.warn("ignoring %r" % err, RuntimeWarning)
debug(err)
continue
unit_name = cat(os.path.join(os.path.dirname(base), 'name'),
binary=False)
@@ -1392,7 +1474,10 @@ def sensors_battery():
for path in paths:
ret = cat(path, fallback=null)
if ret != null:
return int(ret) if ret.isdigit() else ret
try:
return int(ret)
except ValueError:
return ret
return None
bats = [x for x in os.listdir(POWER_SUPPLY_PATH) if x.startswith('BAT') or
@@ -2100,6 +2185,10 @@ class Process(object):
if err.errno == errno.EINVAL:
# not a link
continue
if err.errno == errno.ENAMETOOLONG:
# file name too long
debug(err)
continue
raise
else:
# If path is not an absolute there's no way to tell
+15 -52
View File
@@ -4,7 +4,6 @@
"""macOS platform implementation."""
import contextlib
import errno
import functools
import os
@@ -15,14 +14,14 @@ from . import _psposix
from . import _psutil_osx as cext
from . import _psutil_posix as cext_posix
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import conn_tmap
from ._common import conn_to_ntuple
from ._common import isfile_strict
from ._common import memoize_when_activated
from ._common import NoSuchProcess
from ._common import parse_environ_block
from ._common import usage_percent
from ._common import ZombieProcess
from ._compat import PermissionError
from ._compat import ProcessLookupError
@@ -159,9 +158,9 @@ def cpu_count_logical():
return cext.cpu_count_logical()
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
return cext.cpu_count_phys()
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
return cext.cpu_count_cores()
def cpu_stats():
@@ -354,32 +353,6 @@ def wrap_exceptions(fun):
return wrapper
@contextlib.contextmanager
def catch_zombie(proc):
"""There are some poor C APIs which incorrectly raise ESRCH when
the process is still alive or it's a zombie, or even RuntimeError
(those who don't set errno). This is here in order to solve:
https://github.com/giampaolo/psutil/issues/1044
"""
try:
yield
except (OSError, RuntimeError) as err:
if isinstance(err, RuntimeError) or err.errno == errno.ESRCH:
try:
# status() is not supposed to lie and correctly detect
# zombies so if it raises ESRCH it's true.
status = proc.status()
except NoSuchProcess:
raise err
else:
if status == _common.STATUS_ZOMBIE:
raise ZombieProcess(proc.pid, proc._name, proc._ppid)
else:
raise AccessDenied(proc.pid, proc._name)
else:
raise
class Process(object):
"""Wrapper class around underlying C implementation."""
@@ -402,8 +375,7 @@ class Process(object):
@memoize_when_activated
def _get_pidtaskinfo(self):
# Note: should work for PIDs owned by user only.
with catch_zombie(self):
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
assert len(ret) == len(pidtaskinfo_map)
return ret
@@ -422,18 +394,15 @@ class Process(object):
@wrap_exceptions
def exe(self):
with catch_zombie(self):
return cext.proc_exe(self.pid)
return cext.proc_exe(self.pid)
@wrap_exceptions
def cmdline(self):
with catch_zombie(self):
return cext.proc_cmdline(self.pid)
return cext.proc_cmdline(self.pid)
@wrap_exceptions
def environ(self):
with catch_zombie(self):
return parse_environ_block(cext.proc_environ(self.pid))
return parse_environ_block(cext.proc_environ(self.pid))
@wrap_exceptions
def ppid(self):
@@ -442,8 +411,7 @@ class Process(object):
@wrap_exceptions
def cwd(self):
with catch_zombie(self):
return cext.proc_cwd(self.pid)
return cext.proc_cwd(self.pid)
@wrap_exceptions
def uids(self):
@@ -516,8 +484,7 @@ class Process(object):
if self.pid == 0:
return []
files = []
with catch_zombie(self):
rawlist = cext.proc_open_files(self.pid)
rawlist = cext.proc_open_files(self.pid)
for path, fd in rawlist:
if isfile_strict(path):
ntuple = _common.popenfile(path, fd)
@@ -530,8 +497,7 @@ class Process(object):
raise ValueError("invalid %r kind argument; choose between %s"
% (kind, ', '.join([repr(x) for x in conn_tmap])))
families, types = conn_tmap[kind]
with catch_zombie(self):
rawlist = cext.proc_connections(self.pid, families, types)
rawlist = cext.proc_connections(self.pid, families, types)
ret = []
for item in rawlist:
fd, fam, type, laddr, raddr, status = item
@@ -544,8 +510,7 @@ class Process(object):
def num_fds(self):
if self.pid == 0:
return 0
with catch_zombie(self):
return cext.proc_num_fds(self.pid)
return cext.proc_num_fds(self.pid)
@wrap_exceptions
def wait(self, timeout=None):
@@ -553,13 +518,11 @@ class Process(object):
@wrap_exceptions
def nice_get(self):
with catch_zombie(self):
return cext_posix.getpriority(self.pid)
return cext_posix.getpriority(self.pid)
@wrap_exceptions
def nice_set(self, value):
with catch_zombie(self):
return cext_posix.setpriority(self.pid, value)
return cext_posix.setpriority(self.pid, value)
@wrap_exceptions
def status(self):
+3 -2
View File
@@ -10,18 +10,19 @@ import signal
import sys
import time
from ._common import TimeoutExpired
from ._common import memoize
from ._common import sdiskusage
from ._common import TimeoutExpired
from ._common import usage_percent
from ._compat import PY3
from ._compat import ChildProcessError
from ._compat import FileNotFoundError
from ._compat import InterruptedError
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import PY3
from ._compat import unicode
if sys.version_info >= (3, 4):
import enum
else:
+10 -10
View File
@@ -17,22 +17,22 @@ from . import _common
from . import _psposix
from . import _psutil_posix as cext_posix
from . import _psutil_sunos as cext
from ._common import AccessDenied
from ._common import AF_INET6
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import debug
from ._common import get_procfs_path
from ._common import isfile_strict
from ._common import memoize_when_activated
from ._common import NoSuchProcess
from ._common import sockfam_to_enum
from ._common import socktype_to_enum
from ._common import usage_percent
from ._common import ZombieProcess
from ._compat import b
from ._compat import PY3
from ._compat import FileNotFoundError
from ._compat import PermissionError
from ._compat import ProcessLookupError
from ._compat import PY3
from ._compat import b
__extra__all__ = ["CONN_IDLE", "CONN_BOUND", "PROCFS_PATH"]
@@ -155,7 +155,7 @@ def swap_memory():
total = free = 0
for line in lines:
line = line.split()
t, f = line[3:4]
t, f = line[3:5]
total += int(int(t) * 512)
free += int(int(f) * 512)
used = total - free
@@ -190,9 +190,9 @@ def cpu_count_logical():
return None
def cpu_count_physical():
"""Return the number of physical CPUs in the system."""
return cext.cpu_count_phys()
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
return cext.cpu_count_cores()
def cpu_stats():
@@ -231,7 +231,7 @@ def disk_partitions(all=False):
continue
except OSError as err:
# https://github.com/giampaolo/psutil/issues/1674
debug("skipping %r: %r" % (mountpoint, err))
debug("skipping %r: %s" % (mountpoint, err))
continue
maxfile = maxpath = None # set later
ntuple = _common.sdiskpart(device, mountpoint, fstype, opts,
+21 -12
View File
@@ -14,22 +14,22 @@ import time
from collections import namedtuple
from . import _common
from ._common import ENCODING
from ._common import ENCODING_ERRS
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import TimeoutExpired
from ._common import conn_tmap
from ._common import conn_to_ntuple
from ._common import debug
from ._common import ENCODING
from ._common import ENCODING_ERRS
from ._common import isfile_strict
from ._common import memoize
from ._common import memoize_when_activated
from ._common import NoSuchProcess
from ._common import parse_environ_block
from ._common import TimeoutExpired
from ._common import usage_percent
from ._compat import PY3
from ._compat import long
from ._compat import lru_cache
from ._compat import PY3
from ._compat import range
from ._compat import unicode
from ._psutil_windows import ABOVE_NORMAL_PRIORITY_CLASS
@@ -39,6 +39,7 @@ from ._psutil_windows import IDLE_PRIORITY_CLASS
from ._psutil_windows import NORMAL_PRIORITY_CLASS
from ._psutil_windows import REALTIME_PRIORITY_CLASS
try:
from . import _psutil_windows as cext
except ImportError as err:
@@ -197,7 +198,7 @@ def convert_dos_path(s):
"C:\Windows\systemew\file.txt"
"""
rawdrive = '\\'.join(s.split('\\')[:3])
driveletter = cext.win32_QueryDosDevice(rawdrive)
driveletter = cext.QueryDosDevice(rawdrive)
remainder = s[len(rawdrive):]
return os.path.join(driveletter, remainder)
@@ -241,8 +242,16 @@ def virtual_memory():
def swap_memory():
"""Swap system memory as a (total, used, free, sin, sout) tuple."""
mem = cext.virtual_mem()
total = mem[2]
free = mem[3]
total_phys = mem[0]
free_phys = mem[1]
total_system = mem[2]
free_system = mem[3]
# Despite the name PageFile refers to total system memory here
# thus physical memory values need to be substracted to get swap values
total = total_system - total_phys
free = min(total, free_system - free_phys)
used = total - free
percent = usage_percent(used, total, round_=1)
return _common.sswap(total, used, free, percent, 0, 0)
@@ -304,9 +313,9 @@ def cpu_count_logical():
return cext.cpu_count_logical()
def cpu_count_physical():
"""Return the number of physical CPU cores in the system."""
return cext.cpu_count_phys()
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
return cext.cpu_count_cores()
def cpu_stats():
@@ -759,7 +768,7 @@ class Process(object):
# 24 = ERROR_TOO_MANY_OPEN_FILES. Not sure why this happens
# (perhaps PyPy's JIT delaying garbage collection of files?).
if err.errno == 24:
debug("%r forced into AccessDenied" % err)
debug("%r translated into AccessDenied" % err)
raise AccessDenied(self.pid, self._name)
raise
else:
@@ -9,6 +9,7 @@ Test utilities.
"""
from __future__ import print_function
import atexit
import contextlib
import ctypes
@@ -46,15 +47,16 @@ from psutil import WINDOWS
from psutil._common import bytes2human
from psutil._common import print_color
from psutil._common import supports_ipv6
from psutil._compat import PY3
from psutil._compat import FileExistsError
from psutil._compat import FileNotFoundError
from psutil._compat import PY3
from psutil._compat import range
from psutil._compat import super
from psutil._compat import u
from psutil._compat import unicode
from psutil._compat import which
if PY3:
import unittest
else:
@@ -72,6 +74,9 @@ if sys.version_info >= (3, 4):
else:
enum = None
if POSIX:
from psutil._psposix import wait_pid
__all__ = [
# constants
@@ -482,9 +487,6 @@ def terminate(proc_or_pid, sig=signal.SIGTERM, wait_timeout=GLOBAL_TIMEOUT):
Does nothing if the process does not exist.
Return process exit status.
"""
if POSIX:
from psutil._psposix import wait_pid
def wait(proc, timeout):
if isinstance(proc, subprocess.Popen) and not PY3:
proc.wait()
@@ -952,6 +954,15 @@ class TestMemoryLeak(PsutilTestCase):
retries = 10 if CI_TESTING else 5
verbose = True
_thisproc = psutil.Process()
_psutil_debug_orig = bool(os.getenv('PSUTIL_DEBUG', 0))
@classmethod
def setUpClass(cls):
psutil._set_debug(False) # avoid spamming to stderr
@classmethod
def tearDownClass(cls):
psutil._set_debug(cls._psutil_debug_orig)
def _get_mem(self):
# USS is the closest thing we have to "real" memory usage and it
@@ -1724,8 +1735,8 @@ else:
in memory via ctypes.
Return the new absolutized, normcased path.
"""
from ctypes import wintypes
from ctypes import WinError
from ctypes import wintypes
ext = ".dll"
dst = get_testfn(suffix=suffix + ext)
libs = [x.path for x in psutil.Process().memory_maps() if
@@ -10,4 +10,6 @@ $ python -m psutil.tests
"""
from .runner import main
main()
@@ -21,6 +21,7 @@ Parallel:
"""
from __future__ import print_function
import atexit
import optparse
import os
@@ -28,6 +29,8 @@ import sys
import textwrap
import time
import unittest
try:
import ctypes
except ImportError:
@@ -298,16 +301,15 @@ def get_runner(parallel=False):
# Used by test_*,py modules.
def run_from_name(name):
if CI_TESTING:
print_sysinfo()
suite = TestLoader().from_name(name)
runner = get_runner()
runner.run(suite)
def setup():
# Note: doc states that altering os.environment may cause memory
# leaks on some platforms.
# Sets PSUTIL_TESTING and PSUTIL_DEBUG in the C module.
psutil._psplatform.cext.set_testing()
psutil._set_debug(True)
def main():
@@ -10,11 +10,11 @@
import re
import psutil
from psutil import AIX
from psutil.tests import PsutilTestCase
from psutil.tests import sh
from psutil.tests import unittest
import psutil
@unittest.skipIf(not AIX, "AIX only")
@@ -20,12 +20,12 @@ from psutil import BSD
from psutil import FREEBSD
from psutil import NETBSD
from psutil import OPENBSD
from psutil.tests import spawn_testproc
from psutil.tests import HAS_BATTERY
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import spawn_testproc
from psutil.tests import terminate
from psutil.tests import unittest
from psutil.tests import which
@@ -8,7 +8,6 @@
import os
import socket
import sys
import textwrap
from contextlib import closing
from socket import AF_INET
@@ -28,17 +27,17 @@ from psutil import WINDOWS
from psutil._common import supports_ipv6
from psutil._compat import PY3
from psutil.tests import AF_UNIX
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import SKIP_SYSCONS
from psutil.tests import PsutilTestCase
from psutil.tests import bind_socket
from psutil.tests import bind_unix_socket
from psutil.tests import check_connection_ntuple
from psutil.tests import create_sockets
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import PsutilTestCase
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import SKIP_SYSCONS
from psutil.tests import tcp_socketpair
from psutil.tests import unittest
from psutil.tests import unix_socketpair
@@ -47,7 +46,6 @@ from psutil.tests import wait_for_file
thisproc = psutil.Process()
SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
PYTHON_39 = sys.version_info[:2] == (3, 9)
@serialrun
@@ -18,6 +18,7 @@ import sys
import time
import traceback
import psutil
from psutil import AIX
from psutil import BSD
from psutil import FREEBSD
@@ -33,25 +34,24 @@ from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil._compat import range
from psutil.tests import APPVEYOR
from psutil.tests import check_connection_ntuple
from psutil.tests import CI_TESTING
from psutil.tests import create_sockets
from psutil.tests import enum
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import PYPY
from psutil.tests import SKIP_SYSCONS
from psutil.tests import VALID_PROC_STATUSES
from psutil.tests import PsutilTestCase
from psutil.tests import check_connection_ntuple
from psutil.tests import create_sockets
from psutil.tests import enum
from psutil.tests import is_namedtuple
from psutil.tests import kernel_version
from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import serialrun
from psutil.tests import SKIP_SYSCONS
from psutil.tests import unittest
from psutil.tests import VALID_PROC_STATUSES
import psutil
# ===================================================================
@@ -360,7 +360,6 @@ def proc_info(pid):
elif isinstance(exc, psutil.NoSuchProcess):
tcase.assertProcessGone(proc)
str(exc)
assert exc.msg
def do_wait():
if pid != 0:
+118 -32
View File
@@ -7,6 +7,7 @@
"""Linux specific tests."""
from __future__ import division
import collections
import contextlib
import errno
@@ -23,31 +24,39 @@ import warnings
import psutil
from psutil import LINUX
from psutil._compat import basestring
from psutil._compat import FileNotFoundError
from psutil._compat import PY3
from psutil._compat import FileNotFoundError
from psutil._compat import basestring
from psutil._compat import u
from psutil.tests import call_until
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
from psutil.tests import HAS_RLIMIT
from psutil.tests import mock
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import PsutilTestCase
from psutil.tests import ThreadTask
from psutil.tests import call_until
from psutil.tests import mock
from psutil.tests import reload_module
from psutil.tests import retry_on_failure
from psutil.tests import safe_rmpath
from psutil.tests import sh
from psutil.tests import skip_on_not_implemented
from psutil.tests import ThreadTask
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import unittest
from psutil.tests import which
if LINUX:
from psutil._pslinux import CLOCK_TICKS
from psutil._pslinux import RootFsDeviceFinder
from psutil._pslinux import calculate_avail_vmem
from psutil._pslinux import open_binary
HERE = os.path.abspath(os.path.dirname(__file__))
SIOCGIFADDR = 0x8915
SIOCGIFCONF = 0x8912
@@ -58,6 +67,7 @@ if LINUX:
SECTOR_SIZE = 512
EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
# =====================================================================
# --- utils
# =====================================================================
@@ -141,7 +151,7 @@ def free_swap():
"""Parse 'free' cmd and return swap memory's s total, used and free
values.
"""
out = sh('free -b', env={"LANG": "C.UTF-8"})
out = sh(["free", "-b"], env={"LANG": "C.UTF-8"})
lines = out.split('\n')
for line in lines:
if line.startswith('Swap'):
@@ -160,7 +170,7 @@ def free_physmem():
# and 'cached' memory which may have different positions so we
# do not return them.
# https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
out = sh('free -b', env={"LANG": "C.UTF-8"})
out = sh(["free", "-b"], env={"LANG": "C.UTF-8"})
lines = out.split('\n')
for line in lines:
if line.startswith('Mem'):
@@ -174,7 +184,7 @@ def free_physmem():
def vmstat(stat):
out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
out = sh(["vmstat", "-s"], env={"LANG": "C.UTF-8"})
for line in out.split("\n"):
line = line.strip()
if stat in line:
@@ -183,7 +193,7 @@ def vmstat(stat):
def get_free_version_info():
out = sh("free -V").strip()
out = sh(["free", "-V"]).strip()
if 'UNKNOWN' in out:
raise unittest.SkipTest("can't determine free version")
return tuple(map(int, out.split()[-1].split('.')))
@@ -243,7 +253,8 @@ class TestSystemVirtualMemory(PsutilTestCase):
# self.assertEqual(free_value, psutil_value)
vmstat_value = vmstat('total memory') * 1024
psutil_value = psutil.virtual_memory().total
self.assertAlmostEqual(vmstat_value, psutil_value)
self.assertAlmostEqual(
vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
@retry_on_failure()
def test_used(self):
@@ -303,7 +314,7 @@ class TestSystemVirtualMemory(PsutilTestCase):
def test_available(self):
# "free" output format has changed at some point:
# https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
out = sh("free -b")
out = sh(["free", "-b"])
lines = out.split('\n')
if 'available' not in lines[0]:
raise unittest.SkipTest("free does not support 'available' column")
@@ -357,9 +368,6 @@ class TestSystemVirtualMemory(PsutilTestCase):
def test_avail_old_percent(self):
# Make sure that our calculation of avail mem for old kernels
# is off by max 15%.
from psutil._pslinux import calculate_avail_vmem
from psutil._pslinux import open_binary
mems = {}
with open_binary('/proc/meminfo') as f:
for line in f:
@@ -715,7 +723,7 @@ class TestSystemCPUCountLogical(PsutilTestCase):
@unittest.skipIf(not LINUX, "LINUX only")
class TestSystemCPUCountPhysical(PsutilTestCase):
class TestSystemCPUCountCores(PsutilTestCase):
@unittest.skipIf(not which("lscpu"), "lscpu utility not available")
def test_against_lscpu(self):
@@ -728,9 +736,9 @@ class TestSystemCPUCountPhysical(PsutilTestCase):
self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
def test_method_2(self):
meth_1 = psutil._pslinux.cpu_count_physical()
meth_1 = psutil._pslinux.cpu_count_cores()
with mock.patch('glob.glob', return_value=[]) as m:
meth_2 = psutil._pslinux.cpu_count_physical()
meth_2 = psutil._pslinux.cpu_count_cores()
assert m.called
if meth_1 is not None:
self.assertEqual(meth_1, meth_2)
@@ -738,7 +746,7 @@ class TestSystemCPUCountPhysical(PsutilTestCase):
def test_emulate_none(self):
with mock.patch('glob.glob', return_value=[]) as m1:
with mock.patch('psutil._common.open', create=True) as m2:
self.assertIsNone(psutil._pslinux.cpu_count_physical())
self.assertIsNone(psutil._pslinux.cpu_count_cores())
assert m1.called
assert m2.called
@@ -768,18 +776,14 @@ class TestSystemCPUFrequency(PsutilTestCase):
if path.startswith('/sys/devices/system/cpu/'):
return False
else:
if path == "/proc/cpuinfo":
flags.append(None)
return os_path_exists(path)
flags = []
os_path_exists = os.path.exists
try:
with mock.patch("os.path.exists", side_effect=path_exists_mock):
reload_module(psutil._pslinux)
ret = psutil.cpu_freq()
assert ret
assert flags
self.assertEqual(ret.max, 0.0)
self.assertEqual(ret.min, 0.0)
for freq in psutil.cpu_freq(percpu=True):
@@ -1263,6 +1267,68 @@ class TestSystemDiskIoCounters(PsutilTestCase):
self.assertRaises(NotImplementedError, psutil.disk_io_counters)
@unittest.skipIf(not LINUX, "LINUX only")
class TestRootFsDeviceFinder(PsutilTestCase):
def setUp(self):
dev = os.stat("/").st_dev
self.major = os.major(dev)
self.minor = os.minor(dev)
def test_call_methods(self):
finder = RootFsDeviceFinder()
if os.path.exists("/proc/partitions"):
finder.ask_proc_partitions()
else:
self.assertRaises(FileNotFoundError, finder.ask_proc_partitions)
if os.path.exists("/sys/dev/block/%s:%s/uevent" % (
self.major, self.minor)):
finder.ask_sys_dev_block()
else:
self.assertRaises(FileNotFoundError, finder.ask_sys_dev_block)
finder.ask_sys_class_block()
@unittest.skipIf(GITHUB_ACTIONS, "unsupported on GITHUB_ACTIONS")
def test_comparisons(self):
finder = RootFsDeviceFinder()
self.assertIsNotNone(finder.find())
a = b = c = None
if os.path.exists("/proc/partitions"):
a = finder.ask_proc_partitions()
if os.path.exists("/sys/dev/block/%s:%s/uevent" % (
self.major, self.minor)):
b = finder.ask_sys_class_block()
c = finder.ask_sys_dev_block()
base = a or b or c
if base and a:
self.assertEqual(base, a)
if base and b:
self.assertEqual(base, b)
if base and c:
self.assertEqual(base, c)
@unittest.skipIf(not which("findmnt"), "findmnt utility not available")
@unittest.skipIf(GITHUB_ACTIONS, "unsupported on GITHUB_ACTIONS")
def test_against_findmnt(self):
psutil_value = RootFsDeviceFinder().find()
findmnt_value = sh("findmnt -o SOURCE -rn /")
self.assertEqual(psutil_value, findmnt_value)
def test_disk_partitions_mocked(self):
with mock.patch(
'psutil._pslinux.cext.disk_partitions',
return_value=[('/dev/root', '/', 'ext4', 'rw')]) as m:
part = psutil.disk_partitions()[0]
assert m.called
if not GITHUB_ACTIONS:
self.assertNotEqual(part.device, "/dev/root")
self.assertEqual(part.device, RootFsDeviceFinder().find())
else:
self.assertEqual(part.device, "/dev/root")
# =====================================================================
# --- misc
# =====================================================================
@@ -1431,9 +1497,7 @@ class TestMisc(PsutilTestCase):
# - Process(tid) is supposed to work
# - pids() should not return the TID
# See: https://github.com/giampaolo/psutil/issues/687
t = ThreadTask()
t.start()
try:
with ThreadTask():
p = psutil.Process()
threads = p.threads()
self.assertEqual(len(threads), 2)
@@ -1442,8 +1506,6 @@ class TestMisc(PsutilTestCase):
pt = psutil.Process(tid)
pt.as_dict()
self.assertNotIn(tid, psutil.pids())
finally:
t.stop()
def test_pid_exists_no_proc_status(self):
# Internally pid_exists relies on /proc/{pid}/status.
@@ -1837,6 +1899,22 @@ class TestProcess(PsutilTestCase):
assert not files
assert m.called
def test_open_files_enametoolong(self):
# Simulate a case where /proc/{pid}/fd/{fd} symlink
# points to a file with full path longer than PATH_MAX, see:
# https://github.com/giampaolo/psutil/issues/1940
p = psutil.Process()
files = p.open_files()
with open(self.get_testfn(), 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
patch_point = 'psutil._pslinux.os.readlink'
with mock.patch(patch_point,
side_effect=OSError(errno.ENAMETOOLONG, "")) as m:
files = p.open_files()
assert not files
assert m.called
# --- mocked tests
def test_terminal_mocked(self):
@@ -1985,8 +2063,6 @@ class TestProcess(PsutilTestCase):
self.assertEqual(exc.exception.name, p.name())
def test_stat_file_parsing(self):
from psutil._pslinux import CLOCK_TICKS
args = [
"0", # pid
"(cat)", # name
@@ -2072,6 +2148,16 @@ class TestProcess(PsutilTestCase):
self.assertEqual(gids.saved, 1006)
self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
def test_connections_enametoolong(self):
# Simulate a case where /proc/{pid}/fd/{fd} symlink points to
# a file with full path longer than PATH_MAX, see:
# https://github.com/giampaolo/psutil/issues/1940
with mock.patch('psutil._pslinux.os.readlink',
side_effect=OSError(errno.ENAMETOOLONG, "")) as m:
p = psutil.Process()
assert not p.connections()
assert m.called
@unittest.skipIf(not LINUX, "LINUX only")
class TestProcessAgainstStatus(PsutilTestCase):
@@ -16,6 +16,7 @@ because of how its JIT handles memory, so tests are skipped.
"""
from __future__ import print_function
import functools
import os
@@ -29,8 +30,6 @@ from psutil import SUNOS
from psutil import WINDOWS
from psutil._compat import ProcessLookupError
from psutil._compat import super
from psutil.tests import create_sockets
from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_ENVIRON
@@ -43,12 +42,14 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import TestMemoryLeak
from psutil.tests import create_sockets
from psutil.tests import get_testfn
from psutil.tests import process_namespace
from psutil.tests import skip_on_access_denied
from psutil.tests import spawn_testproc
from psutil.tests import system_namespace
from psutil.tests import terminate
from psutil.tests import TestMemoryLeak
from psutil.tests import unittest
@@ -347,7 +348,7 @@ class TestModuleFunctionsLeaks(TestMemoryLeak):
self.execute(lambda: psutil.cpu_count(logical=True))
@fewtimes_if_linux()
def test_cpu_count_physical(self):
def test_cpu_count_cores(self):
self.execute(lambda: psutil.cpu_count(logical=False))
@fewtimes_if_linux()
@@ -456,6 +457,9 @@ class TestModuleFunctionsLeaks(TestMemoryLeak):
def test_users(self):
self.execute(psutil.users)
def test_set_debug(self):
self.execute(lambda: psutil._set_debug(False))
if WINDOWS:
# --- win services
@@ -17,16 +17,19 @@ import os
import pickle
import socket
import stat
import sys
import psutil
import psutil.tests
from psutil import LINUX
from psutil import POSIX
from psutil import WINDOWS
from psutil._common import debug
from psutil._common import memoize
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil._common import wrap_numbers
from psutil._compat import PY3
from psutil._compat import redirect_stderr
from psutil.tests import APPVEYOR
from psutil.tests import CI_TESTING
from psutil.tests import HAS_BATTERY
@@ -35,20 +38,15 @@ from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import import_module_by_path
from psutil.tests import mock
from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import reload_module
from psutil.tests import ROOT_DIR
from psutil.tests import SCRIPTS_DIR
from psutil.tests import PsutilTestCase
from psutil.tests import import_module_by_path
from psutil.tests import mock
from psutil.tests import reload_module
from psutil.tests import sh
from psutil.tests import unittest
import psutil
import psutil.tests
PYTHON_39 = sys.version_info[:2] == (3, 9)
# ===================================================================
@@ -97,57 +95,71 @@ class TestMisc(PsutilTestCase):
def test_process__str__(self):
self.test_process__repr__(func=str)
def test_no_such_process__repr__(self, func=repr):
def test_no_such_process__repr__(self):
self.assertEqual(
repr(psutil.NoSuchProcess(321)),
"psutil.NoSuchProcess process no longer exists (pid=321)")
"psutil.NoSuchProcess(pid=321, msg='process no longer exists')")
self.assertEqual(
repr(psutil.NoSuchProcess(321, name='foo')),
"psutil.NoSuchProcess process no longer exists (pid=321, "
"name='foo')")
self.assertEqual(
repr(psutil.NoSuchProcess(321, msg='foo')),
"psutil.NoSuchProcess foo")
repr(psutil.NoSuchProcess(321, name="name", msg="msg")),
"psutil.NoSuchProcess(pid=321, name='name', msg='msg')")
def test_zombie_process__repr__(self, func=repr):
def test_no_such_process__str__(self):
self.assertEqual(
str(psutil.NoSuchProcess(321)),
"process no longer exists (pid=321)")
self.assertEqual(
str(psutil.NoSuchProcess(321, name="name", msg="msg")),
"msg (pid=321, name='name')")
def test_zombie_process__repr__(self):
self.assertEqual(
repr(psutil.ZombieProcess(321)),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321)")
'psutil.ZombieProcess(pid=321, msg="PID still '
'exists but it\'s a zombie")')
self.assertEqual(
repr(psutil.ZombieProcess(321, name='foo')),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321, name='foo')")
self.assertEqual(
repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
"psutil.ZombieProcess process still exists but it's a zombie "
"(pid=321, name='foo', ppid=1)")
self.assertEqual(
repr(psutil.ZombieProcess(321, msg='foo')),
"psutil.ZombieProcess foo")
repr(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")),
"psutil.ZombieProcess(pid=321, ppid=320, name='name', msg='foo')")
def test_access_denied__repr__(self, func=repr):
def test_zombie_process__str__(self):
self.assertEqual(
str(psutil.ZombieProcess(321)),
"PID still exists but it's a zombie (pid=321)")
self.assertEqual(
str(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")),
"foo (pid=321, ppid=320, name='name')")
def test_access_denied__repr__(self):
self.assertEqual(
repr(psutil.AccessDenied(321)),
"psutil.AccessDenied (pid=321)")
"psutil.AccessDenied(pid=321)")
self.assertEqual(
repr(psutil.AccessDenied(321, name='foo')),
"psutil.AccessDenied (pid=321, name='foo')")
self.assertEqual(
repr(psutil.AccessDenied(321, msg='foo')),
"psutil.AccessDenied foo")
repr(psutil.AccessDenied(321, name="name", msg="msg")),
"psutil.AccessDenied(pid=321, name='name', msg='msg')")
def test_timeout_expired__repr__(self, func=repr):
def test_access_denied__str__(self):
self.assertEqual(
repr(psutil.TimeoutExpired(321)),
"psutil.TimeoutExpired timeout after 321 seconds")
str(psutil.AccessDenied(321)),
"(pid=321)")
self.assertEqual(
repr(psutil.TimeoutExpired(321, pid=111)),
"psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
str(psutil.AccessDenied(321, name="name", msg="msg")),
"msg (pid=321, name='name')")
def test_timeout_expired__repr__(self):
self.assertEqual(
repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
"psutil.TimeoutExpired timeout after 321 seconds "
"(pid=111, name='foo')")
repr(psutil.TimeoutExpired(5)),
"psutil.TimeoutExpired(seconds=5, msg='timeout after 5 seconds')")
self.assertEqual(
repr(psutil.TimeoutExpired(5, pid=321, name="name")),
"psutil.TimeoutExpired(pid=321, name='name', seconds=5, "
"msg='timeout after 5 seconds')")
def test_timeout_expired__str__(self):
self.assertEqual(
str(psutil.TimeoutExpired(5)),
"timeout after 5 seconds")
self.assertEqual(
str(psutil.TimeoutExpired(5, pid=321, name="name")),
"timeout after 5 seconds (pid=321, name='name')")
def test_process__eq__(self):
p1 = psutil.Process()
@@ -354,6 +366,8 @@ class TestMisc(PsutilTestCase):
check(psutil.disk_usage(os.getcwd()))
check(psutil.users())
# XXX: https://github.com/pypa/setuptools/pull/2896
@unittest.skipIf(APPVEYOR, "temporarily disabled due to setuptools bug")
def test_setup_script(self):
setup_py = os.path.join(ROOT_DIR, 'setup.py')
if CI_TESTING and not os.path.exists(setup_py):
@@ -387,6 +401,35 @@ class TestMisc(PsutilTestCase):
reload_module(psutil)
self.assertIn("version conflict", str(cm.exception).lower())
def test_debug(self):
if PY3:
from io import StringIO
else:
from StringIO import StringIO
with redirect_stderr(StringIO()) as f:
debug("hello")
msg = f.getvalue()
assert msg.startswith("psutil-debug"), msg
self.assertIn("hello", msg)
self.assertIn(__file__.replace('.pyc', '.py'), msg)
# supposed to use repr(exc)
with redirect_stderr(StringIO()) as f:
debug(ValueError("this is an error"))
msg = f.getvalue()
self.assertIn("ignoring ValueError", msg)
self.assertIn("'this is an error'", msg)
# supposed to use str(exc), because of extra info about file name
with redirect_stderr(StringIO()) as f:
exc = OSError(2, "no such file")
exc.filename = "/foo"
debug(exc)
msg = f.getvalue()
self.assertIn("no such file", msg)
self.assertIn("/foo", msg)
# ===================================================================
# --- Tests for wrap_numbers() function.
@@ -679,11 +722,12 @@ class TestScripts(PsutilTestCase):
@unittest.skipIf(not POSIX, "POSIX only")
def test_executable(self):
for name in os.listdir(SCRIPTS_DIR):
if name.endswith('.py'):
path = os.path.join(SCRIPTS_DIR, name)
if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
self.fail('%r is not executable' % path)
for root, dirs, files in os.walk(SCRIPTS_DIR):
for file in files:
if file.endswith('.py'):
path = os.path.join(root, file)
if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
raise self.fail('%r is not executable' % path)
def test_disk_usage(self):
self.assert_stdout('disk_usage.py')
@@ -11,17 +11,22 @@ import time
import psutil
from psutil import MACOS
from psutil import POSIX
from psutil.tests import HAS_BATTERY
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import PsutilTestCase
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import spawn_testproc
from psutil.tests import terminate
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import TOLERANCE_SYS_MEM
from psutil.tests import unittest
if POSIX:
from psutil._psutil_posix import getpagesize
def sysctl(cmdline):
"""Expects a sysctl command with an argument and parse the result
returning only the value of interest.
@@ -36,8 +41,6 @@ def sysctl(cmdline):
def vm_stat(field):
"""Wrapper around 'vm_stat' cmdline utility."""
from psutil._psutil_posix import getpagesize
out = sh('vm_stat')
for line in out.split('\n'):
if field in line:
@@ -137,7 +140,7 @@ class TestSystemAPIs(PsutilTestCase):
num = sysctl("sysctl hw.logicalcpu")
self.assertEqual(num, psutil.cpu_count(logical=True))
def test_cpu_count_physical(self):
def test_cpu_count_cores(self):
num = sysctl("sysctl hw.physicalcpu")
self.assertEqual(num, psutil.cpu_count(logical=False))
@@ -23,18 +23,19 @@ from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil.tests import CI_TESTING
from psutil.tests import spawn_testproc
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import mock
from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import PsutilTestCase
from psutil.tests import mock
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import spawn_testproc
from psutil.tests import terminate
from psutil.tests import unittest
from psutil.tests import which
if POSIX:
import mmap
import resource
@@ -21,7 +21,6 @@ import time
import types
import psutil
from psutil import AIX
from psutil import BSD
from psutil import LINUX
@@ -33,15 +32,12 @@ from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import open_text
from psutil._compat import PY3
from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil._compat import PY3
from psutil._compat import super
from psutil.tests import APPVEYOR
from psutil.tests import call_until
from psutil.tests import CI_TESTING
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_CPU_AFFINITY
@@ -52,17 +48,20 @@ from psutil.tests import HAS_PROC_CPU_NUM
from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import mock
from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import PsutilTestCase
from psutil.tests import ThreadTask
from psutil.tests import call_until
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import mock
from psutil.tests import process_namespace
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import skip_on_not_implemented
from psutil.tests import ThreadTask
from psutil.tests import unittest
from psutil.tests import wait_for_pid
@@ -718,6 +717,12 @@ class TestProcess(PsutilTestCase):
if NETBSD or OPENBSD or AIX:
self.assertEqual(p.cmdline()[0], PYTHON_EXE)
else:
if MACOS and CI_TESTING:
pyexe = p.cmdline()[0]
if pyexe != PYTHON_EXE:
self.assertEqual(' '.join(p.cmdline()[1:]),
' '.join(cmdline[1:]))
return
self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline))
@unittest.skipIf(PYPY, "broken on PYPY")
@@ -1326,6 +1331,20 @@ class TestProcess(PsutilTestCase):
self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
assert m.called
def test_reused_pid(self):
# Emulate a case where PID has been reused by another process.
subp = self.spawn_testproc()
p = psutil.Process(subp.pid)
p._ident = (p.pid, p.create_time() + 100)
assert not p.is_running()
assert p != psutil.Process(subp.pid)
msg = "process no longer exists and its PID has been reused"
self.assertRaisesRegex(psutil.NoSuchProcess, msg, p.suspend)
self.assertRaisesRegex(psutil.NoSuchProcess, msg, p.resume)
self.assertRaisesRegex(psutil.NoSuchProcess, msg, p.terminate)
self.assertRaisesRegex(psutil.NoSuchProcess, msg, p.kill)
self.assertRaisesRegex(psutil.NoSuchProcess, msg, p.children)
def test_pid_0(self):
# Process(0) is supposed to work on all platforms except Linux
if 0 not in psutil.pids():
@@ -1369,7 +1388,6 @@ class TestProcess(PsutilTestCase):
def test_environ(self):
def clean_dict(d):
# Most of these are problematic on Travis.
d.pop("PSUTIL_TESTING", None)
d.pop("PLAT", None)
d.pop("HOME", None)
if MACOS:
@@ -1395,11 +1413,13 @@ class TestProcess(PsutilTestCase):
code = textwrap.dedent("""
#include <unistd.h>
#include <fcntl.h>
char * const argv[] = {"cat", 0};
char * const envp[] = {"A=1", "X", "C=3", 0};
int main(void) {
/* Close stderr on exec so parent can wait for the execve to
* finish. */
// Close stderr on exec so parent can wait for the
// execve to finish.
if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
return 0;
return execve("/bin/cat", argv, envp);
@@ -31,10 +31,9 @@ from psutil import WINDOWS
from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil.tests import ASCII_FS
from psutil.tests import check_net_address
from psutil.tests import CI_TESTING
from psutil.tests import DEVNULL
from psutil.tests import enum
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
@@ -44,12 +43,13 @@ from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import IS_64BIT
from psutil.tests import mock
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import retry_on_failure
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import UNICODE_SUFFIX
from psutil.tests import PsutilTestCase
from psutil.tests import check_net_address
from psutil.tests import enum
from psutil.tests import mock
from psutil.tests import retry_on_failure
from psutil.tests import unittest
@@ -316,16 +316,16 @@ class TestCpuAPIs(PsutilTestCase):
if "physical id" not in cpuinfo_data:
raise unittest.SkipTest("cpuinfo doesn't include physical id")
def test_cpu_count_physical(self):
def test_cpu_count_cores(self):
logical = psutil.cpu_count()
physical = psutil.cpu_count(logical=False)
if physical is None:
raise self.skipTest("physical cpu_count() is None")
cores = psutil.cpu_count(logical=False)
if cores is None:
raise self.skipTest("cpu_count_cores() is None")
if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista
self.assertIsNone(physical)
self.assertIsNone(cores)
else:
self.assertGreaterEqual(physical, 1)
self.assertGreaterEqual(logical, physical)
self.assertGreaterEqual(cores, 1)
self.assertGreaterEqual(logical, cores)
def test_cpu_count_none(self):
# https://github.com/giampaolo/psutil/issues/1085
@@ -334,7 +334,7 @@ class TestCpuAPIs(PsutilTestCase):
return_value=val) as m:
self.assertIsNone(psutil.cpu_count())
assert m.called
with mock.patch('psutil._psplatform.cpu_count_physical',
with mock.patch('psutil._psplatform.cpu_count_cores',
return_value=val) as m:
self.assertIsNone(psutil.cpu_count(logical=False))
assert m.called
@@ -17,25 +17,28 @@ import socket
import stat
import subprocess
import psutil
import psutil.tests
from psutil import FREEBSD
from psutil import NETBSD
from psutil import POSIX
from psutil._common import open_binary
from psutil._common import open_text
from psutil._common import supports_ipv6
from psutil.tests import CI_TESTING
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import PYTHON_EXE
from psutil.tests import PsutilTestCase
from psutil.tests import TestMemoryLeak
from psutil.tests import bind_socket
from psutil.tests import bind_unix_socket
from psutil.tests import call_until
from psutil.tests import chdir
from psutil.tests import CI_TESTING
from psutil.tests import create_sockets
from psutil.tests import get_free_port
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import is_namedtuple
from psutil.tests import mock
from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
from psutil.tests import retry
from psutil.tests import retry_on_failure
@@ -45,13 +48,11 @@ from psutil.tests import serialrun
from psutil.tests import system_namespace
from psutil.tests import tcp_socketpair
from psutil.tests import terminate
from psutil.tests import TestMemoryLeak
from psutil.tests import unittest
from psutil.tests import unix_socketpair
from psutil.tests import wait_for_file
from psutil.tests import wait_for_pid
import psutil
import psutil.tests
# ===================================================================
# --- Unit tests for test utilities.
@@ -350,6 +351,7 @@ class TestNetUtils(PsutilTestCase):
@serialrun
class TestMemLeakClass(TestMemoryLeak):
@retry_on_failure()
def test_times(self):
def fun():
cnt['cnt'] += 1
@@ -79,6 +79,7 @@ import traceback
import warnings
from contextlib import closing
import psutil
from psutil import BSD
from psutil import OPENBSD
from psutil import POSIX
@@ -87,28 +88,27 @@ from psutil._compat import PY3
from psutil._compat import u
from psutil.tests import APPVEYOR
from psutil.tests import ASCII_FS
from psutil.tests import bind_unix_socket
from psutil.tests import chdir
from psutil.tests import CI_TESTING
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import INVALID_UNICODE_SUFFIX
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import TESTFN_PREFIX
from psutil.tests import UNICODE_SUFFIX
from psutil.tests import PsutilTestCase
from psutil.tests import bind_unix_socket
from psutil.tests import chdir
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import get_testfn
from psutil.tests import safe_mkdir
from psutil.tests import safe_rmpath
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import spawn_testproc
from psutil.tests import terminate
from psutil.tests import TESTFN_PREFIX
from psutil.tests import UNICODE_SUFFIX
from psutil.tests import unittest
import psutil
if APPVEYOR:
@@ -27,15 +27,15 @@ from psutil.tests import APPVEYOR
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import HAS_BATTERY
from psutil.tests import IS_64BIT
from psutil.tests import mock
from psutil.tests import PsutilTestCase
from psutil.tests import PY3
from psutil.tests import PYPY
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import PsutilTestCase
from psutil.tests import mock
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import spawn_testproc
from psutil.tests import terminate
from psutil.tests import TOLERANCE_DISK_USAGE
from psutil.tests import unittest
@@ -47,24 +47,13 @@ if WINDOWS and not PYPY:
import win32process
import wmi # requires "pip install wmi" / "make setup-dev-env"
if WINDOWS:
from psutil._pswindows import convert_oserror
cext = psutil._psplatform.cext
def wrap_exceptions(fun):
def wrapper(self, *args, **kwargs):
try:
return fun(self, *args, **kwargs)
except OSError as err:
from psutil._pswindows import ACCESS_DENIED_SET
if err.errno in ACCESS_DENIED_SET:
raise psutil.AccessDenied(None, None)
if err.errno == errno.ESRCH:
raise psutil.NoSuchProcess(None, None)
raise
return wrapper
@unittest.skipIf(not WINDOWS, "WINDOWS only")
@unittest.skipIf(PYPY, "pywin32 not available on PYPY")
# https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692
@@ -100,7 +89,7 @@ class TestCpuAPIs(WindowsTestCase):
proc = w.Win32_Processor()[0]
self.assertEqual(psutil.cpu_count(), proc.NumberOfLogicalProcessors)
def test_cpu_count_phys_vs_wmi(self):
def test_cpu_count_cores_vs_wmi(self):
w = wmi.WMI()
proc = w.Win32_Processor()[0]
self.assertEqual(psutil.cpu_count(logical=False), proc.NumberOfCores)
@@ -633,7 +622,6 @@ class TestDualProcessImplementation(PsutilTestCase):
assert fun.called
def test_cmdline(self):
from psutil._pswindows import convert_oserror
for pid in psutil.pids():
try:
a = cext.proc_cmdline(pid, use_peb=True)
@@ -725,7 +713,7 @@ class RemoteProcessTestCase(PsutilTestCase):
p = psutil.Process(self.proc32.pid)
e = p.environ()
self.assertIn("THINK_OF_A_NUMBER", e)
self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
self.assertEqual(e["THINK_OF_A_NUMBER"], str(os.getpid()))
def test_environ_64(self):
p = psutil.Process(self.proc64.pid)