diff --git a/rllab/dynamics_randomization/mujoco_model_gen.py b/rllab/dynamics_randomization/mujoco_model_gen.py index 108fa3991..60840726a 100644 --- a/rllab/dynamics_randomization/mujoco_model_gen.py +++ b/rllab/dynamics_randomization/mujoco_model_gen.py @@ -1,5 +1,5 @@ import atexit -import sys +from queue import Queue from threading import Event from threading import Thread @@ -32,6 +32,8 @@ def __init__(self, file_path, variations): """ self._variations = variations self._file_path = file_path + # Synchronized queue to store mujoco_models + self._models = Queue(maxsize=10) # Worker Thread self._worker_thread = Thread( target=self._generator_routine, daemon=True, name="Worker-Thread") @@ -60,17 +62,10 @@ def get_model(self): user in this class. """ if not self._worker_thread.is_alive(): - # If worker thread is dead because of an error, raise an error in main thread + # If worker thread terminates because of an error, terminates main thread raise ChildProcessError("Error raised in Worker-Thread") - if not self._model_ready.is_set(): - # If the model is not ready yet, wait for it to be finished. - self._model_ready.wait() - # Cleat the event flag for the next iteration - self._model_ready.clear() - # Request a new model to the worker thread. - self._model_requested.set() - return self._mujoco_model + return self._models.get() def stop(self): """ @@ -78,6 +73,9 @@ def stop(self): randomized environment is terminated or when the training is interrupted. """ if self._worker_thread.is_alive(): + while not self._models.empty(): + self._models.get() + self._model_requested.set() self._stop_event.set() self._worker_thread.join() @@ -126,9 +124,4 @@ def _generator_routine(self): model_xml = etree.tostring(parsed_model.getroot()).decode("ascii") self._mujoco_model = load_model_from_xml(model_xml) - - # Wake up the calling thread if it was waiting - self._model_ready.set() - # Go to idle mode (wait for event) - self._model_requested.wait() - self._model_requested.clear() + self._models.put(self._mujoco_model)