From 389a669bee1fc46cade96f77ce4265d232c352f8 Mon Sep 17 00:00:00 2001 From: Michael Peters Date: Tue, 31 Jan 2023 17:22:28 -0800 Subject: [PATCH] add --disable-debugger-detection flag --- README.rst | 4 ++ pytest_timeout.py | 94 +++++++++++++++++++++++++++++++++--------- test_pytest_timeout.py | 88 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 19 deletions(-) diff --git a/README.rst b/README.rst index 7a5448b..df3bcf3 100644 --- a/README.rst +++ b/README.rst @@ -241,6 +241,10 @@ check to see if the module it belongs to is present in a set of known debugging frameworks modules OR if pytest itself drops you into a pdb session using ``--pdb`` or similar. +This functionality can be disabled with the ``--disable-debugger-detection`` flag +or the corresponding ``timeout_disable_debugger_detection`` ini setting / environment +variable. + Extending pytest-timeout with plugins ===================================== diff --git a/pytest_timeout.py b/pytest_timeout.py index 2bb4f0b..6177db0 100644 --- a/pytest_timeout.py +++ b/pytest_timeout.py @@ -40,11 +40,17 @@ function body, ignoring the time it takes when evaluating any fixtures used in the test. """.strip() +DISABLE_DEBUGGER_DETECTION_DESC = """ +When specified, disables debugger detection. breakpoint(), pdb.set_trace(), etc. +will be interrupted. +""".strip() # bdb covers pdb, ipdb, and possibly others # pydevd covers PyCharm, VSCode, and possibly others KNOWN_DEBUGGING_MODULES = {"pydevd", "bdb", "pydevd_frame_evaluator"} -Settings = namedtuple("Settings", ["timeout", "method", "func_only"]) +Settings = namedtuple( + "Settings", ["timeout", "method", "func_only", "disable_debugger_detection"] +) @pytest.hookimpl @@ -68,9 +74,21 @@ def pytest_addoption(parser): choices=["signal", "thread"], help=METHOD_DESC, ) + group.addoption( + "--disable-debugger-detection", + dest="timeout_disable_debugger_detection", + action="store_true", + help=DISABLE_DEBUGGER_DETECTION_DESC, + ) parser.addini("timeout", TIMEOUT_DESC) parser.addini("timeout_method", METHOD_DESC) parser.addini("timeout_func_only", FUNC_ONLY_DESC, type="bool", default=False) + parser.addini( + "timeout_disable_debugger_detection", + DISABLE_DEBUGGER_DETECTION_DESC, + type="bool", + default=False, + ) class TimeoutHooks: @@ -107,19 +125,24 @@ def pytest_configure(config): """Register the marker so it shows up in --markers output.""" config.addinivalue_line( "markers", - "timeout(timeout, method=None, func_only=False): Set a timeout, timeout " + "timeout(timeout, method=None, func_only=False, " + "disable_debugger_detection=False): Set a timeout, timeout " "method and func_only evaluation on just one test item. The first " "argument, *timeout*, is the timeout in seconds while the keyword, " - "*method*, takes the same values as the --timeout_method option. The " + "*method*, takes the same values as the --timeout-method option. The " "*func_only* keyword, when set to True, defers the timeout evaluation " "to only the test function body, ignoring the time it takes when " - "evaluating any fixtures used in the test.", + "evaluating any fixtures used in the test. The " + "*disable_debugger_detection* keyword, when set to True, disables " + "debugger detection, allowing breakpoint(), pdb.set_trace(), etc. " + "to be interrupted", ) settings = get_env_settings(config) config._env_timeout = settings.timeout config._env_timeout_method = settings.method config._env_timeout_func_only = settings.func_only + config._env_timeout_disable_debugger_detection = settings.disable_debugger_detection @pytest.hookimpl(hookwrapper=True) @@ -238,7 +261,7 @@ def pytest_timeout_set_timer(item, settings): def handler(signum, frame): __tracebackhide__ = True - timeout_sigalrm(item, settings.timeout) + timeout_sigalrm(item, settings) def cancel(): signal.setitimer(signal.ITIMER_REAL, 0) @@ -248,9 +271,7 @@ def cancel(): signal.signal(signal.SIGALRM, handler) signal.setitimer(signal.ITIMER_REAL, settings.timeout) elif timeout_method == "thread": - timer = threading.Timer( - settings.timeout, timeout_timer, (item, settings.timeout) - ) + timer = threading.Timer(settings.timeout, timeout_timer, (item, settings)) timer.name = "%s %s" % (__name__, item.nodeid) def cancel(): @@ -299,12 +320,21 @@ def get_env_settings(config): method = DEFAULT_METHOD func_only = config.getini("timeout_func_only") - return Settings(timeout, method, func_only) + + disable_debugger_detection = config.getvalue("timeout_disable_debugger_detection") + if disable_debugger_detection is None: + ini = config.getini("timeout_disable_debugger_detection") + if ini: + disable_debugger_detection = _validate_disable_debugger_detection( + ini, "config file" + ) + + return Settings(timeout, method, func_only, disable_debugger_detection) def _get_item_settings(item, marker=None): """Return (timeout, method) for an item.""" - timeout = method = func_only = None + timeout = method = func_only = disable_debugger_detection = None if not marker: marker = item.get_closest_marker("timeout") if marker is not None: @@ -312,13 +342,18 @@ def _get_item_settings(item, marker=None): timeout = _validate_timeout(settings.timeout, "marker") method = _validate_method(settings.method, "marker") func_only = _validate_func_only(settings.func_only, "marker") + disable_debugger_detection = _validate_disable_debugger_detection( + settings.disable_debugger_detection, "marker" + ) if timeout is None: timeout = item.config._env_timeout if method is None: method = item.config._env_timeout_method if func_only is None: func_only = item.config._env_timeout_func_only - return Settings(timeout, method, func_only) + if disable_debugger_detection is None: + disable_debugger_detection = item.config._env_timeout_disable_debugger_detection + return Settings(timeout, method, func_only, disable_debugger_detection) def _parse_marker(marker): @@ -329,7 +364,7 @@ def _parse_marker(marker): """ if not marker.args and not marker.kwargs: raise TypeError("Timeout marker must have at least one argument") - timeout = method = func_only = NOTSET = object() + timeout = method = func_only = disable_debugger_detection = NOTSET = object() for kw, val in marker.kwargs.items(): if kw == "timeout": timeout = val @@ -337,6 +372,8 @@ def _parse_marker(marker): method = val elif kw == "func_only": func_only = val + elif kw == "disable_debugger_detection": + disable_debugger_detection = val else: raise TypeError("Invalid keyword argument for timeout marker: %s" % kw) if len(marker.args) >= 1 and timeout is not NOTSET: @@ -347,7 +384,13 @@ def _parse_marker(marker): raise TypeError("Multiple values for method argument of timeout marker") elif len(marker.args) >= 2: method = marker.args[1] - if len(marker.args) > 2: + if len(marker.args) >= 3 and disable_debugger_detection is not NOTSET: + raise TypeError( + "Multiple values for disable_debugger_detection argument of timeout marker" + ) + elif len(marker.args) >= 3: + disable_debugger_detection = marker.args[2] + if len(marker.args) > 3: raise TypeError("Too many arguments for timeout marker") if timeout is NOTSET: timeout = None @@ -355,7 +398,9 @@ def _parse_marker(marker): method = None if func_only is NOTSET: func_only = None - return Settings(timeout, method, func_only) + if disable_debugger_detection is NOTSET: + disable_debugger_detection = None + return Settings(timeout, method, func_only, disable_debugger_detection) def _validate_timeout(timeout, where): @@ -383,14 +428,25 @@ def _validate_func_only(func_only, where): return func_only -def timeout_sigalrm(item, timeout): +def _validate_disable_debugger_detection(disable_debugger_detection, where): + if disable_debugger_detection is None: + return None + if not isinstance(disable_debugger_detection, bool): + raise ValueError( + "Invalid disable_debugger_detection value %s from %s" + % (disable_debugger_detection, where) + ) + return disable_debugger_detection + + +def timeout_sigalrm(item, settings): """Dump stack of threads and raise an exception. This will output the stacks of any threads other then the current to stderr and then raise an AssertionError, thus terminating the test. """ - if is_debugging(): + if not settings.disable_debugger_detection and is_debugging(): return __tracebackhide__ = True nthreads = len(threading.enumerate()) @@ -399,16 +455,16 @@ def timeout_sigalrm(item, timeout): dump_stacks() if nthreads > 1: write_title("Timeout", sep="+") - pytest.fail("Timeout >%ss" % timeout) + pytest.fail("Timeout >%ss" % settings.timeout) -def timeout_timer(item, timeout): +def timeout_timer(item, settings): """Dump stack of threads and call os._exit(). This disables the capturemanager and dumps stdout and stderr. Then the stacks are dumped and os._exit(1) is called. """ - if is_debugging(): + if not settings.disable_debugger_detection and is_debugging(): return try: capman = item.config.pluginmanager.getplugin("capturemanager") diff --git a/test_pytest_timeout.py b/test_pytest_timeout.py index 096e57c..1db4da7 100644 --- a/test_pytest_timeout.py +++ b/test_pytest_timeout.py @@ -486,6 +486,94 @@ def test_foo(): assert "fail" not in result +@pytest.mark.parametrize( + ["debugging_module", "debugging_set_trace"], + [ + ("pdb", "set_trace()"), + pytest.param( + "ipdb", + "set_trace()", + marks=pytest.mark.xfail( + reason="waiting on https://github.com/pytest-dev/pytest/pull/7207" + " to allow proper testing" + ), + ), + pytest.param( + "pydevd", + "settrace(port=4678)", + marks=pytest.mark.xfail(reason="in need of way to setup pydevd server"), + ), + ], +) +@have_spawn +def test_disable_debugger_detection_flag( + testdir, debugging_module, debugging_set_trace +): + p1 = testdir.makepyfile( + """ + import pytest, {debugging_module} + + @pytest.mark.timeout(1) + def test_foo(): + {debugging_module}.{debugging_set_trace} + """.format( + debugging_module=debugging_module, debugging_set_trace=debugging_set_trace + ) + ) + child = testdir.spawn_pytest(f"{p1} --disable-debugger-detection") + child.expect("test_foo") + time.sleep(1.2) + result = child.read().decode().lower() + if child.isalive(): + child.terminate(force=True) + assert "timeout >1.0s" in result + assert "fail" in result + + +@pytest.mark.parametrize( + ["debugging_module", "debugging_set_trace"], + [ + ("pdb", "set_trace()"), + pytest.param( + "ipdb", + "set_trace()", + marks=pytest.mark.xfail( + reason="waiting on https://github.com/pytest-dev/pytest/pull/7207" + " to allow proper testing" + ), + ), + pytest.param( + "pydevd", + "settrace(port=4678)", + marks=pytest.mark.xfail(reason="in need of way to setup pydevd server"), + ), + ], +) +@have_spawn +def test_disable_debugger_detection_marker( + testdir, debugging_module, debugging_set_trace +): + p1 = testdir.makepyfile( + """ + import pytest, {debugging_module} + + @pytest.mark.timeout(1, disable_debugger_detection=True) + def test_foo(): + {debugging_module}.{debugging_set_trace} + """.format( + debugging_module=debugging_module, debugging_set_trace=debugging_set_trace + ) + ) + child = testdir.spawn_pytest(str(p1)) + child.expect("test_foo") + time.sleep(1.2) + result = child.read().decode().lower() + if child.isalive(): + child.terminate(force=True) + assert "timeout >1.0s" in result + assert "fail" in result + + def test_is_debugging(monkeypatch): import pytest_timeout