Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProcConcatVec close fix #219

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion supersuit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ def __getattr__(wrapper_name):
raise ImportError(f"cannot import name '{wrapper_name}' from 'supersuit'")


__version__ = "3.8.0"
__version__ = "3.8.1"
43 changes: 25 additions & 18 deletions supersuit/vector/multiproc_vec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import multiprocessing as mp
import time
import traceback

import gymnasium.vector
Expand Down Expand Up @@ -133,6 +134,8 @@ def __init__(
self.observation_space, self.shared_obs, n=self.num_envs
)

self.graceful_shutdown_timeout = 10

pipes = []
procs = []
for constr in vec_env_constrs:
Expand Down Expand Up @@ -219,13 +222,7 @@ def step(self, actions):
return self.step_wait()

def __del__(self):
for pipe in self.pipes:
try:
pipe.send("terminate")
except ConnectionError:
pass
for proc in self.procs:
proc.join()
self.close()

def render(self):
self.pipes[0].send("render")
Expand All @@ -239,17 +236,27 @@ def render(self):
return render_result

def close(self):
for pipe in self.pipes:
pipe.send("close")
for pipe in self.pipes:
try:
pipe.recv()
except EOFError:
raise RuntimeError(
"only one multiproccessing vector environment can open a window over the duration of a process"
)
except ConnectionError:
pass
try:
for pipe, proc in zip(self.pipes, self.procs):
if proc.is_alive():
pipe.send("close")
except OSError:
pass
else:
deadline = (
None
if self.graceful_shutdown_timeout is None
else time.monotonic() + self.graceful_shutdown_timeout
)
for proc in self.procs:
timeout = None if deadline is None else deadline - time.monotonic()
if timeout is not None and timeout <= 0:
break
proc.join(timeout)
for pipe, proc in zip(self.pipes, self.procs):
if proc.is_alive():
proc.kill()
pipe.close()

def env_is_wrapped(self, wrapper_class, indices=None):
for i, pipe in enumerate(self.pipes):
Expand Down