Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enh/error on timeout #683

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions amlb/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def file_lock(path, timeout=-1):
yield


class StaleProcessError(subprocess.TimeoutExpired):
pass


def run_subprocess(
*popenargs,
input=None,
Expand Down Expand Up @@ -104,7 +108,16 @@ def communicate(process, input=None, timeout=None):
except: # also handles kb interrupts
process.kill()
raise

retcode = process.poll()
if retcode is None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this is a reliable way to tell the process is still running. And if the process is still running at this point I think the only reason can be that the communicate function returned early, which should only happen with an activity timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @PGijsbers, I am not very familiar with this section of the code base but the logic looks reasonable to me.

Do I understand correctly that after the previous try/except block has been completed, the process should not be running, and this code block ensures that this is the case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stdout, stderr = communicate(process, input, timeout=timeout) line is blocking until no stderr and stdout is detected. A lack of output means that either there was an activity timeout (nothing written during the specified interval), or the process stopped. The communication logic doesn't detect which.
That's why at this level, after communication has stopped, we can do an extra check (this one), to see if the process is alive. If the process is still alive we can infer that the communication was stopped because a timeout was detected. In that case, the subprocess wouldn't just stop by itself, so we send the kill signal and raise an error as a normal return from this function would indicate the process finished successfully.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also thanks so much for doing the review anyway :) I understand generally speaking people aren't too familiar with the internals (to be frank, I also had to brush up on some of it as Seb wrote this), I just appreciate a sanity check. Both for correctness and to avoid me refactoring/solving things in a way that only I will be able to understand later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, that makes sense. I would maybe add a small comment indicating the two possible reasons why retcode is None - at least for me that helpful to understand the purpose of this block.

# Process still lives => communication stopped because of activity timeout
process.kill()
process.wait()
raise StaleProcessError(
process.args, float("nan"), output=stdout, stderr=stderr
)

if check and retcode:
raise subprocess.CalledProcessError(
retcode, process.args, output=stdout, stderr=stderr
Expand Down Expand Up @@ -186,6 +199,9 @@ def read_pipe(pipe, timeout):
pipes = as_list(pipe)
# wait until a pipe is ready for reading, non-Windows only.
ready, *_ = select.select(pipes, [], [], timeout)
# if a pipe is not ready it could be timeout or it could be end of process
# so at this point we do not know. Only after the communicate function is over do we know.
# i.e., if the process is still running it does not have a retcode.
Comment on lines +202 to +204
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably I would have raised the activity timeout from the function which uses it. But at this stage we can unfortunately not detect whether the error should be raised.

reads = [""] * len(pipes)
# print update for each pipe that is ready for reading
for i, p in enumerate(pipes):
Expand Down Expand Up @@ -315,8 +331,6 @@ def communicate(*args, **kwargs):
log.log(params.output_level, e.stdout)
if e.stderr:
log.log(params.error_level, e.stderr)
# error_tail = tail(e.stderr, 25) if e.stderr else 'Unknown Error'
# raise subprocess.SubprocessError("Error when running command `{cmd}`: {error}".format(cmd=full_cmd, error=error_tail))
raise e


Expand Down
1 change: 1 addition & 0 deletions resources/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ archive: ['logs'] # list of output folders that should be archived by defa
setup: # configuration namespace for the framework setup phase.
live_output: true # set to true to stream the output of setup commands, if false they are only printed when setup is complete.
activity_timeout: 600 # when using live output, subprocess will be considered as hanging if nothing was printed during this activity time.
# No effect is live_output is set to 'False'.

frameworks: # configuration namespace for the frameworks definitions.
definition_file: # list of yaml files describing the frameworks base definitions.
Expand Down
18 changes: 15 additions & 3 deletions runbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
str2bool,
str_sanitize,
zip_path,
StaleProcessError,
Namespace,
)
from amlb import log, AutoMLError
from amlb.defaults import default_dirs
Expand Down Expand Up @@ -358,9 +360,19 @@
args.mode,
)

bench.setup(amlb.SetupMode[args.setup])
if args.setup != "only":
res = bench.run(args.task, args.fold)
try:
bench.setup(amlb.SetupMode[args.setup])
except StaleProcessError as e:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move to exception notes instead and/or revise this structure. The reason I did it this way is to communicate more clearly to the user with a final message what went wrong and how to solve it. I want to generally make errors easier to parse, as there are some issues opened that are completely solvable from the traceback, but users can't/don't try to parse those.

setting = "setup.activity_timeout"
timeout = Namespace.get(amlb_res.config, setting)
log.error(
f"Process '{e.cmd}' was aborted after producing no output for {timeout} seconds. "
f"If the process is expected to take more time, please raise the '{setting}' limit."
)
exit_code = 1
else:
if args.setup != "only":
res = bench.run(args.task, args.fold)
except (ValueError, AutoMLError) as e:
log.error("\nERROR:\n%s", e)
if extras.get("verbose") is True:
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/amlb/utils/process/test_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from amlb.utils import StaleProcessError, run_cmd


@pytest.mark.parametrize("mode", ["line", "block", True])
def test_subprocess_detects_stale_process(mode):
one_ms = 0.001
with pytest.raises(StaleProcessError):
run_cmd(f"sleep {one_ms}", _activity_timeout_=one_ms / 2, _live_output_=mode)


def test_subprocess_does_not_raises_if_process_exits_early():
run_cmd("echo hi", _activity_timeout_=1, _live_output_=True)


@pytest.mark.xfail(reason="Detection of stale processes currently requires output")
def test_subprocess_does_not_raises_on_silent_process():
one_ms = 0.001
run_cmd(f"sleep {one_ms}", _activity_timeout_=one_ms / 2, _live_output_=True)
Loading