diff --git a/com.unity.ml-agents/CHANGELOG.md b/com.unity.ml-agents/CHANGELOG.md index 9461ae736d..d6cb0f86f9 100755 --- a/com.unity.ml-agents/CHANGELOG.md +++ b/com.unity.ml-agents/CHANGELOG.md @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Update Barracuda to 0.6.0-preview ### Bugfixes - +- Fixed an issue which caused self-play training sessions to consume a lot of memory. (#3451) ## [0.14.0-preview] - 2020-02-13 diff --git a/ml-agents/mlagents/trainers/ghost/trainer.py b/ml-agents/mlagents/trainers/ghost/trainer.py index 1d77f2c0c9..a4c78f365a 100644 --- a/ml-agents/mlagents/trainers/ghost/trainer.py +++ b/ml-agents/mlagents/trainers/ghost/trainer.py @@ -40,6 +40,7 @@ def __init__( self.internal_policy_queues: List[AgentManagerQueue[Policy]] = [] self.internal_trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] + self.ignored_trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] self.learning_policy_queues: Dict[str, AgentManagerQueue[Policy]] = {} # assign ghost's stats collection to wrapped trainer's @@ -134,10 +135,14 @@ def advance(self) -> None: self.trajectory_queues, self.internal_trajectory_queues ): try: - t = traj_queue.get_nowait() - # adds to wrapped trainers queue - internal_traj_queue.put(t) - self._process_trajectory(t) + # We grab at most the maximum length of the queue. + # This ensures that even if the queue is being filled faster than it is + # being emptied, the trajectories in the queue are on-policy. + for _ in range(traj_queue.maxlen): + t = traj_queue.get_nowait() + # adds to wrapped trainers queue + internal_traj_queue.put(t) + self._process_trajectory(t) except AgentManagerQueue.Empty: pass @@ -162,6 +167,14 @@ def advance(self) -> None: self._swap_snapshots() self.last_swap = self.get_step + # Dump trajectories from non-learning policy + for traj_queue in self.ignored_trajectory_queues: + try: + for _ in range(traj_queue.maxlen): + traj_queue.get_nowait() + except AgentManagerQueue.Empty: + pass + def end_episode(self): self.trainer.end_episode() @@ -256,6 +269,8 @@ def subscribe_trajectory_queue( self.internal_trajectory_queues.append(internal_trajectory_queue) self.trainer.subscribe_trajectory_queue(internal_trajectory_queue) + else: + self.ignored_trajectory_queues.append(trajectory_queue) # Taken from https://github.com/Unity-Technologies/ml-agents/pull/1975 and diff --git a/ml-agents/mlagents/trainers/tests/test_ghost.py b/ml-agents/mlagents/trainers/tests/test_ghost.py index cae5ac781f..316e85ea0a 100644 --- a/ml-agents/mlagents/trainers/tests/test_ghost.py +++ b/ml-agents/mlagents/trainers/tests/test_ghost.py @@ -152,6 +152,8 @@ def test_process_trajectory(dummy_config): # Check that ghost trainer ignored off policy queue assert trainer.trainer.update_buffer.num_experiences == 15 + # Check that it emptied the queue + assert trajectory_queue1.empty() def test_publish_queue(dummy_config):