Skip to content

Commit

Permalink
[bug-fix] When agent isn't training, don't clear update buffer (#5205)
Browse files Browse the repository at this point in the history
* Don't clear update buffer, but don't append to it either

* Update changelog

* Address comments

* Make experience replay buffer saving more verbose
  • Loading branch information
Ervin T committed Apr 1, 2021
1 parent d17b735 commit 63e7ad4
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 20 deletions.
2 changes: 1 addition & 1 deletion com.unity.ml-agents/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ depend on the previous behavior, you can explicitly set the Agent's `InferenceDe
settings. Unfortunately, this may require retraining models if it changes the resulting order of the sensors
or actuators on your system. (#5194)
#### ml-agents / ml-agents-envs / gym-unity (Python)

- Fixed a bug where the SAC replay buffer would not be saved out at the end of a run, even if `save_replay_buffer` was enabled. (#5205)

## [1.9.0-preview] - 2021-03-17
### Major Changes
Expand Down
5 changes: 1 addition & 4 deletions ml-agents/mlagents/trainers/poca/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ def _process_trajectory(self, trajectory: Trajectory) -> None:
)
agent_buffer_trajectory[BufferKey.ADVANTAGES].set(global_advantages)

# Append to update buffer
agent_buffer_trajectory.resequence_and_append(
self.update_buffer, training_length=self.policy.sequence_length
)
self._append_to_update_buffer(agent_buffer_trajectory)

# If this was a terminal trajectory, append stats and reset reward collection
if trajectory.done_reached:
Expand Down
6 changes: 2 additions & 4 deletions ml-agents/mlagents/trainers/ppo/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,8 @@ def _process_trajectory(self, trajectory: Trajectory) -> None:
global_returns = list(np.mean(np.array(tmp_returns, dtype=np.float32), axis=0))
agent_buffer_trajectory[BufferKey.ADVANTAGES].set(global_advantages)
agent_buffer_trajectory[BufferKey.DISCOUNTED_RETURNS].set(global_returns)
# Append to update buffer
agent_buffer_trajectory.resequence_and_append(
self.update_buffer, training_length=self.policy.sequence_length
)

self._append_to_update_buffer(agent_buffer_trajectory)

# If this was a terminal trajectory, append stats and reset reward collection
if trajectory.done_reached:
Expand Down
10 changes: 5 additions & 5 deletions ml-agents/mlagents/trainers/sac/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ def save_replay_buffer(self) -> None:
Save the training buffer's update buffer to a pickle file.
"""
filename = os.path.join(self.artifact_path, "last_replay_buffer.hdf5")
logger.info(f"Saving Experience Replay Buffer to {filename}")
logger.info(f"Saving Experience Replay Buffer to {filename}...")
with open(filename, "wb") as file_object:
self.update_buffer.save_to_file(file_object)
logger.info(
f"Saved Experience Replay Buffer ({os.path.getsize(filename)} bytes)."
)

def load_replay_buffer(self) -> None:
"""
Expand Down Expand Up @@ -175,10 +178,7 @@ def _process_trajectory(self, trajectory: Trajectory) -> None:
agent_buffer_trajectory[ObsUtil.get_name_at_next(i)][-1] = obs
agent_buffer_trajectory[BufferKey.DONE][-1] = False

# Append to update buffer
agent_buffer_trajectory.resequence_and_append(
self.update_buffer, training_length=self.policy.sequence_length
)
self._append_to_update_buffer(agent_buffer_trajectory)

if trajectory.done_reached:
self._update_end_episode_stats(agent_id, self.optimizer)
Expand Down
39 changes: 35 additions & 4 deletions ml-agents/mlagents/trainers/tests/test_rl_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ def test_clear_update_buffer():


@mock.patch("mlagents.trainers.trainer.trainer.Trainer.save_model")
@mock.patch("mlagents.trainers.trainer.rl_trainer.RLTrainer._clear_update_buffer")
def test_advance(mocked_clear_update_buffer, mocked_save_model):
def test_advance(mocked_save_model):
trainer = create_rl_trainer()
mock_policy = mock.Mock()
trainer.add_policy("TestBrain", mock_policy)
Expand Down Expand Up @@ -115,9 +114,8 @@ def test_advance(mocked_clear_update_buffer, mocked_save_model):
with pytest.raises(AgentManagerQueue.Empty):
policy_queue.get_nowait()

# Check that the buffer has been cleared
# Check that no model has been saved
assert not trainer.should_still_train
assert mocked_clear_update_buffer.call_count > 0
assert mocked_save_model.call_count == 0


Expand Down Expand Up @@ -181,6 +179,39 @@ def test_summary_checkpoint(mock_add_checkpoint, mock_write_summary):
mock_add_checkpoint.assert_has_calls(add_checkpoint_calls)


def test_update_buffer_append():
trainer = create_rl_trainer()
mock_policy = mock.Mock()
trainer.add_policy("TestBrain", mock_policy)
trajectory_queue = AgentManagerQueue("testbrain")
policy_queue = AgentManagerQueue("testbrain")
trainer.subscribe_trajectory_queue(trajectory_queue)
trainer.publish_policy_queue(policy_queue)
time_horizon = 10
trajectory = mb.make_fake_trajectory(
length=time_horizon,
observation_specs=create_observation_specs_with_shapes([(1,)]),
max_step_complete=True,
action_spec=ActionSpec.create_discrete((2,)),
)
agentbuffer_trajectory = trajectory.to_agentbuffer()
assert trainer.update_buffer.num_experiences == 0

# Check that if we append, our update buffer gets longer.
# max_steps = 100
for i in range(10):
trainer._process_trajectory(trajectory)
trainer._append_to_update_buffer(agentbuffer_trajectory)
assert trainer.update_buffer.num_experiences == (i + 1) * time_horizon

# Check that if we append after stopping training, nothing happens.
# We process enough trajectories to hit max steps
trainer.set_is_policy_updating(False)
trainer._process_trajectory(trajectory)
trainer._append_to_update_buffer(agentbuffer_trajectory)
assert trainer.update_buffer.num_experiences == (i + 1) * time_horizon


class RLTrainerWarningTest(unittest.TestCase):
def test_warning_group_reward(self):
with self.assertLogs("mlagents.trainers", level="WARN") as cm:
Expand Down
17 changes: 15 additions & 2 deletions ml-agents/mlagents/trainers/trainer/rl_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,21 @@ def _maybe_write_summary(self, step_after_process: int) -> None:
if step_after_process >= self._next_summary_step and self.get_step != 0:
self._write_summary(self._next_summary_step)

def _append_to_update_buffer(self, agentbuffer_trajectory: AgentBuffer) -> None:
"""
Append an AgentBuffer to the update buffer. If the trainer isn't training,
don't update to avoid a memory leak.
"""
if self.should_still_train:
seq_len = (
self.trainer_settings.network_settings.memory.sequence_length
if self.trainer_settings.network_settings.memory is not None
else 1
)
agentbuffer_trajectory.resequence_and_append(
self.update_buffer, training_length=seq_len
)

def _maybe_save_model(self, step_after_process: int) -> None:
"""
If processing the trajectory will make the step exceed the next model write,
Expand Down Expand Up @@ -298,5 +313,3 @@ def advance(self) -> None:
for q in self.policy_queues:
# Get policies that correspond to the policy queue in question
q.put(self.get_policy(q.behavior_id))
else:
self._clear_update_buffer()

0 comments on commit 63e7ad4

Please sign in to comment.