Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where run_command blocks when it shouldn't #2169

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metaflow/plugins/argo/argo_workflows_deployer_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def suspend(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def unsuspend(self, **kwargs) -> bool:
Expand Down Expand Up @@ -131,6 +132,7 @@ def unsuspend(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def terminate(self, **kwargs) -> bool:
Expand Down Expand Up @@ -165,6 +167,7 @@ def terminate(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

@property
Expand Down Expand Up @@ -319,6 +322,7 @@ def delete(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun:
Expand Down Expand Up @@ -361,7 +365,7 @@ def trigger(self, **kwargs) -> ArgoWorkflowsTriggeredRun:
content = handle_timeout(
attribute_file_fd, command_obj, self.deployer.file_read_timeout
)

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return ArgoWorkflowsTriggeredRun(
deployer=self.deployer, content=content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def terminate(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0


Expand Down Expand Up @@ -174,6 +175,7 @@ def delete(self, **kwargs) -> bool:
)

command_obj = self.deployer.spm.get(pid)
command_obj.sync_wait()
return command_obj.process.returncode == 0

def trigger(self, **kwargs) -> StepFunctionsTriggeredRun:
Expand Down Expand Up @@ -217,6 +219,7 @@ def trigger(self, **kwargs) -> StepFunctionsTriggeredRun:
attribute_file_fd, command_obj, self.deployer.file_read_timeout
)

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return StepFunctionsTriggeredRun(
deployer=self.deployer, content=content
Expand Down
2 changes: 1 addition & 1 deletion metaflow/runner/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Deployer(metaclass=DeployerMeta):
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file.
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the deployment command.
Expand Down
4 changes: 2 additions & 2 deletions metaflow/runner/deployer_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class variable that matches the name of the CLI group.
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file.
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the deployment command.
Expand Down Expand Up @@ -144,7 +144,7 @@ def _create(
# Additional info is used to pass additional deployer specific information.
# It is used in non-OSS deployers (extensions).
self.additional_info = content.get("additional_info", {})

command_obj.sync_wait()
if command_obj.process.returncode == 0:
return create_class(deployer=self)

Expand Down
5 changes: 4 additions & 1 deletion metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class Runner(object):
The directory to run the subprocess in; if not specified, the current
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the runner attribute file.
The timeout until which we try to read the runner attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the `run` command.
Expand Down Expand Up @@ -272,6 +272,9 @@ async def __aenter__(self) -> "Runner":

def __get_executing_run(self, attribute_file_fd, command_obj):
content = handle_timeout(attribute_file_fd, command_obj, self.file_read_timeout)

command_obj.sync_wait()

content = json.loads(content)
pathspec = "%s/%s" % (content.get("flow_name"), content.get("run_id"))

Expand Down
2 changes: 2 additions & 0 deletions metaflow/runner/nbdeploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class NBDeployer(object):
base_dir : str, optional, default None
The directory to run the subprocess in; if not specified, the current
working directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the deployer attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` i.e. options
listed in `python myflow.py --help`
Expand Down
2 changes: 1 addition & 1 deletion metaflow/runner/nbrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class NBRunner(object):
The directory to run the subprocess in; if not specified, the current
working directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the runner attribute file.
The timeout until which we try to read the runner attribute file (in seconds).
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the `run` command.
Expand Down
4 changes: 3 additions & 1 deletion metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def run_command(
"""
Run a command synchronously and return its process ID.

Note: in no case does this wait for the process to *finish*. Use sync_wait()
to wait for the command to finish.

Parameters
----------
command : List[str]
Expand All @@ -145,7 +148,6 @@ def run_command(
command_obj = CommandManager(command, env, cwd)
pid = command_obj.run(show_output=show_output)
self.commands[pid] = command_obj
command_obj.sync_wait()
return pid

async def async_run_command(
Expand Down
57 changes: 37 additions & 20 deletions metaflow/runner/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def read_from_fifo_when_ready(
encoding : str, optional
Encoding to use while reading the file, by default "utf-8".
timeout : int, optional
Timeout for reading the file in milliseconds, by default 3600.
Timeout for reading the file in seconds, by default 3600.

Returns
-------
Expand All @@ -107,30 +107,47 @@ def read_from_fifo_when_ready(
content to the FIFO.
"""
content = bytearray()

poll = select.poll()
poll.register(fifo_fd, select.POLLIN)

max_timeout = 3 # Wait for 10 * 3 = 30 ms after last write
while True:
poll_begin = time.time()
poll.poll(timeout)
timeout -= 1000 * (time.time() - poll_begin)

if timeout <= 0:
if timeout < 0:
raise TimeoutError("Timeout while waiting for the file content")

poll_begin = time.time()
# We poll for a very short time to be also able to check if the file was closed
# If the file is closed, we assume that we only have one writer so if we have
# data, we break out. This is to work around issues in macos
events = poll.poll(min(10, timeout * 1000))
timeout -= time.time() - poll_begin

try:
data = os.read(fifo_fd, 128)
while data:
data = os.read(fifo_fd, 8192)
if data:
content += data
data = os.read(fifo_fd, 128)

# Read from a non-blocking closed FIFO returns an empty byte array
break

else:
if len(events):
# We read an EOF -- consider the file done
break
else:
# We had no events (just a timeout) and the read didn't return
# an exception so the file is still open; we continue waiting for data
# Unfortunately, on MacOS, it seems that even *after* the file is
# closed on the other end, we still don't get a BlockingIOError so
# we hack our way and timeout if there is no write in 30ms which is
# a relative eternity for file writes.
if content:
if max_timeout <= 0:
break
max_timeout -= 1
continue
except BlockingIOError:
# FIFO is open but no data is available yet
continue
has_blocking_error = True
if content:
# The file was closed
break
# else, if we have no content, we continue waiting for the file to be open
# and written to.

if not content and check_process_exited(command_obj):
raise CalledProcessError(command_obj.process.returncode, command_obj.command)
Expand All @@ -156,7 +173,7 @@ async def async_read_from_fifo_when_ready(
encoding : str, optional
Encoding to use while reading the file, by default "utf-8".
timeout : int, optional
Timeout for reading the file in milliseconds, by default 3600.
Timeout for reading the file in seconds, by default 3600.

Returns
-------
Expand Down Expand Up @@ -206,7 +223,7 @@ def handle_timeout(
command_obj : CommandManager
Command manager object that encapsulates the running command details.
file_read_timeout : int
Timeout for reading the file.
Timeout for reading the file, in seconds

Returns
-------
Expand Down Expand Up @@ -243,7 +260,7 @@ async def async_handle_timeout(
command_obj : CommandManager
Command manager object that encapsulates the running command details.
file_read_timeout : int
Timeout for reading the file.
Timeout for reading the file, in seconds

Returns
-------
Expand Down
Loading