Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Preparatory PR for multi-agent, multi-GPU learning agent (alpha-star style) #02. #21649

Merged

Conversation

sven1977
Copy link
Contributor

@sven1977 sven1977 commented Jan 17, 2022

Preparatory PR for multi-agent, multi-GPU learning agent (alpha-star style) #2.

  • Add asynchronous_parallel_requests utility to rllib/execution/parallel_requests.py. Allows sending (up to n in-flight) parallel remote requests to a set of actors and collecting and returning those results that are available (given some timeout).
  • docstring/comment cleanups
  • MixinMultiAgentReplayBuffer: Will be used by (in-the-pipeline) AlphaStar algo as default buffer on each GPU node to store training data for the n remote policy actors on the same node.
  • Preps Policy class to be usable as remote ray actor, adding e.g. get_host() and also a (preliminary) learn_on_batch_from_replay_buffer method.

Why are these changes needed?

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks great. a few comments, thanks.

buffer = self.replay_buffers[policy_id]
# If buffer empty or no new samples to mix with replayed ones,
# return None.
if len(buffer) == 0 or len(buffer.last_added_batches) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be safer to check:

if len(buffer) == 0 and self.replay_ratio > 0.0:
    return None
if len(buffer.last_added_batches) == 0 and self.replay_ratio < 1.0:
    return None

In other words, if we need to have replay samples and the buffer is empty, or we actually need to have newly added samples, but there isn't any, return None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, actually.
If len(buffer) == 0, then self.last_added_batches should also be empty.
Everything that's in self.last_added_batches is guaranteed to be part of the buffer already.

What we should check, though is, whether replay_ratio == 1.0 (in this case, we do NOT care about buffer.last_added_batches because 100% is replayed (older samples)).

I'll fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's pretty much what I meant. thanks.

# replay ratio = old / [old + new]
num_new = len(output_batches)
num_old = 0
while random.random() > num_old / (num_old + num_new):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not looking at self.replay_ratio here?
can you explain a bit how this while loop is statistically connected to self.replay_ratio?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I am still confused. old / [old + new] is current replay_ratio right?
the logic is we stop adding replayed samples when random.random() is less than current replay_ratio. which by gut feelings should result in an expected ratio of 0.5?

I feel like at the very least, self.replay_ratio should get involved in the math here, something like:

expected_replay_batches = self.replay_ratio * num_new
while random.random() < expected_replay_batches:
    output_batches.append(buffer.replay())
    expected_replay_batches -= 1.0

Copy link
Contributor Author

@sven1977 sven1977 Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been fixed. There is also a test case now that verifies, different given ratios are being respected.

The randomness is necessary here, to allow for "odd" ratios. E.g. if we assume that there is always just one new batch and the replay ratio is say 0.33, then only every 2nd returned batch should have 1 additional old batch in it.

`remote_fn()`, which will be applied to the actor(s) instead.

Args:
trainer: The Trainer object that we run the sampling for.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass in remote_requests_in_flight, instead of the entire trainer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, just double checking, it seems like trainer is still the parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this had been fixed. Could you take another look?



@ExperimentalAPI
def asynchronous_parallel_requests(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function looks really familiar. are we not replacing some existing logics with this util func call somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry, moved it into a new module for better clarity: This function may not only be used to collect SampleBatches from a RolloutWorker, but works generically on any set (and types!) of ray remote actors.

Copy link
Contributor Author

@sven1977 sven1977 Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the old code (not used yet anywhere anyways) in replay_ops.py.

"""
# Sample a batch from the given replay actor.
# For better performance, make sure the replay actor is co-located
# with this policy (on the same node).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this comment needs to be updated?
where are we making sure the replay_actor is co-located with the policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The driver (execution plan/Trainer.setup) needs to make sure of that. But it's not hard-required, just better for performance as we don't have to send the batch across the wire, then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. can. you update the comment a little bit and say:

For better performance, trainer will try to schedule replay actors co-located with this policy

something like that? thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this comment should be here (we don't know for sure what the Trainer will do). The policy has no influence on its Trainer. We should simply clarify that it would be better, if they WERE on the same node, but it's not a hard requirement. And that the Trainer (that creates buffer and policy) needs to take care of this, not the policy itself.

I'll fix.

@sven1977 sven1977 requested a review from avnishn as a code owner January 25, 2022 09:37
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of minor comments, one math question. thanks.

# replay ratio = old / [old + new]
num_new = len(output_batches)
num_old = 0
while random.random() > num_old / (num_old + num_new):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I am still confused. old / [old + new] is current replay_ratio right?
the logic is we stop adding replayed samples when random.random() is less than current replay_ratio. which by gut feelings should result in an expected ratio of 0.5?

I feel like at the very least, self.replay_ratio should get involved in the math here, something like:

expected_replay_batches = self.replay_ratio * num_new
while random.random() < expected_replay_batches:
    output_batches.append(buffer.replay())
    expected_replay_batches -= 1.0

`remote_fn()`, which will be applied to the actor(s) instead.

Args:
trainer: The Trainer object that we run the sampling for.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, just double checking, it seems like trainer is still the parameter?

"""
# Sample a batch from the given replay actor.
# For better performance, make sure the replay actor is co-located
# with this policy (on the same node).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. can. you update the comment a little bit and say:

For better performance, trainer will try to schedule replay actors co-located with this policy

something like that? thanks.

@sven1977 sven1977 added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Jan 27, 2022
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, still have 2 comments.

# Mix buffer's last added batches with older replayed batches.
with self.replay_timer:
output_batches = self.last_added_batches[policy_id].copy()
self.last_added_batches[policy_id].clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you clear right after copy, why not:

output_batches = self.last_added_batches[policy_id]
self.last_added_batches[policy_id] = []

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to save a .copy() op

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think this is a good idea? this is the only comment I have left.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Indeed, it saves the copy. Done. :)

@gjoliver
Copy link
Member

gjoliver commented Jan 27, 2022 via email

…ntralized_multi_agent_learning_02

# Conflicts:
#	rllib/execution/rollout_ops.py
@sven1977 sven1977 merged commit ee41800 into ray-project:master Jan 27, 2022
@@ -38,7 +38,7 @@ def __init__(self,

# By default, use Glorot unform initializer.
if initializer is None:
initializer = flax.nn.initializers.xavier_uniform()
initializer = nn.initializers.xavier_uniform()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sven1977 I'm seeing AttributeError: module 'flax' has no attribute 'nn' on 1.11.0 release branch. Is this line cherry-pickable, or does it need to be cherry picked into 1.11.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this line is cherry-pickable. Let's also fix the comment:

Fixed code:

        # By default, use Glorot uniform initializer.
        if initializer is None:
            initializer = nn.initializers.xavier_uniform()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mwtian ^^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants