-
Notifications
You must be signed in to change notification settings - Fork 7.1k
Treat actor creation like a regular task. #1668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Treat actor creation like a regular task. #1668
Conversation
python/ray/actor.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of silly, but when I need to remember to do something I introduce linting errors so that Travis will remind me.
|
Test FAILed. |
|
Test PASSed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
| resources={"CPU": actor_method_cpus}, | ||
| max_calls=0)) | ||
|
|
||
| if actor_creation_resources is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the cases when this is not None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. some of this needs to be rethought, but basically, submit_task looks up the task resource requirements based on the info in worker.function_properties, so we need to fill that out so that submit_task does the right thing. But maybe the right thing is for resources to be passed into submit_task instead.
Basically, whenever a worker might submit an actor creation task, then register_actor_signatures needs to have been called with actor_creation_resources not equal to None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, are there cases where it is equal to None then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a worker turns into an actor, it calls register_actor_signatures, and the actor_creation_resources data isn't really available at that point. I could pass it in by storing it in redis or through some other mechanism, but it seemed unnecessary. If this sounds a little strange, part of the issue is that there is a difference in the data that is needed at function invocation time versus at function execution time, but we're using worker.function_properties to store both things.
| actor_method_num_return_vals, method_signatures, | ||
| checkpoint_interval, class_name, | ||
| actor_creation_dummy_object_id, | ||
| actor_creation_resources, actor_method_cpus): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need these extra fields in the serialized handle? I can see how we need the actor_creation_dummy_object_id, but not sure about the others.
(Actually on that note we probably can get rid of some of the other information here, like the checkpoint_interval.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. it's possibly not all needed. I can look through and get rid of the extras if you prefer.
I'm also planning on doing a follow up PR to try to simplify some of this code after this PR and the join consistency PR are merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, maybe just leave a TODO about removing some of these fields then? I think we should be careful about adding too many fields to actor handles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, will add that.
| if (ray.worker.global_worker.connected and | ||
| self._ray_actor_handle_id.id() == ray.worker.NIL_ACTOR_ID): | ||
| # TODO(rkn): Should we be passing in the actor cursor as a | ||
| # dependency here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay for now, since we want the __ray_terminate__ task to get scheduled immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok sounds good. Do you want me to remove the comment? By default I would leave it there because I think we may want to revisit this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, let's leave the comment.
python/ray/monitor.py
Outdated
| log.warn("Failed to remove object location for " | ||
| "dead plasma manager.") | ||
|
|
||
| # TODO(rkn): Analogously to the above loop, we may want to remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually be okay since dummy object IDs are only known by the local scheduler, so the object manager never adds them to the object table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll remove this.
| log.info( | ||
| "Driver {} has been removed.".format(binary_to_hex(driver_id))) | ||
|
|
||
| # Get a list of the local schedulers that have not been deleted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay :)
| (algorithm_state->available_workers.size() > 0) && | ||
| can_run(algorithm_state, execution_spec)) { | ||
| can_run(algorithm_state, execution_spec) && | ||
| state->static_resources["CPU"] != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make more sense to move this logic into resource_constraints_satisfied. Then we can also add logic to make sure that it only happens for actor creation tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
|
||
| if (state->static_resources["CPU"] == 0) { | ||
| // Give the task to the global scheduler to schedule. | ||
| give_task_to_global_scheduler(state, algorithm_state, execution_spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this statement reachable if the global scheduler never assigns actor creation tasks to a node with no CPUs? If we can, let's try to avoid the state->static_resources["CPU"] == 0 checks since they're a little cryptic if you don't know about the special case for actor resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it's not possible. What if I change this to
// The global scheduler should never assign a task to a machine with 0 CPUs.
RAY_CHECK(state->static_resources["CPU"] != 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that sounds good.
| */ | ||
| bool constraints_satisfied_hard(const LocalScheduler *scheduler, | ||
| const TaskSpec *spec) { | ||
| if (scheduler->info.static_resources.at("CPU") == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not in this PR, but It would be great if we could store the error for when there are no feasible nodes as part of the return value for the task. How doable is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, what do you mean by "as part of the return value for the task? E.g., store an exception
object in the object store so that ray.get on the return value raises an exception? I think something like this would be useful. Right now we just print an error to STDERR at
ray/src/global_scheduler/global_scheduler_algorithm.cc
Lines 139 to 140 in 0fcceef
| RAY_LOG(ERROR) << "Infeasible task. No nodes satisfy hard constraints for " | |
| << "task = " << Task_task_id(task); |
It's possible for the lack of feasible nodes to be transient (e.g., if more nodes are being added to the cluster or some were just slow to start up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I didn't think about more nodes getting added to the cluster. Yeah, I was thinking about the possibility of storing the exception, but I agree, I think the printed error is good then.
| pid = ray.get(a.getpid.remote()) | ||
|
|
||
| # Make sure that we can't create another actor. | ||
| with self.assertRaises(Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we propagate an error if we can't find enough resources to create an actor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no because other actors may go out of scope very soon enabling this actor to be created.
Similarly, a task may not be able to be run right away, but soon some resources will free up and then it can run.
|
|
||
| @ray.remote | ||
| class Foo(object): | ||
| def __init__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of off-topic, but is it okay to define an actor without an __init__ method? I had thought that we threw an error for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We print an error. It's valid Python and generally ok I think.
|
Test PASSed. |
|
Test PASSed. |
| const TaskSpec *spec) { | ||
| if (scheduler->info.static_resources.at("CPU") == 0) { | ||
| if (scheduler->info.static_resources.count("CPU") == 1 && | ||
| scheduler->info.static_resources.at("CPU") == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an extra check here that it's only for actor creation tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be the reason for a special check? I encourage to not special case anything. We need to have a common abstraction that works for both tasks and actors. Both can be thought of as resource consumers, and the scheduler's job is to meet resource demand with resource supply. Once the correct common abstraction is identified, there should be no need to special case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably no need for a check here. However, the reason would be that currently actor creation tasks are the only tasks where the task resource requirements at runtime differ from the task resource requirements during scheduling (because during scheduling we need to be aware of subsequent method resource requirements).
| ActorID actor_id, | ||
| bool is_worker, | ||
| int64_t num_gpus); | ||
| bool is_worker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dumb q - when are local schedulers clients not workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when they are drivers :)
| """ | ||
| message = DriverTableMessage.GetRootAsDriverTableMessage(data, 0) | ||
| driver_id = message.DriverId() | ||
| log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to log after the cleanup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's helpful. This is only in the monitor logs anyway, which should generally be redirected to a file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I just meant to maybe log after L438?
python/ray/actor.py
Outdated
| actor_method_name).id() | ||
| worker.function_properties[driver_id][function_id] = ( | ||
| # The extra return value is an actor dummy object. | ||
| # In the cases wher actor_method_cpus is None, that value should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: where
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
| function_id = compute_actor_creation_function_id(class_id) | ||
| worker.function_properties[driver_id][function_id.id()] = ( | ||
| # The extra return value is an actor dummy object. | ||
| FunctionProperties(num_return_vals=0 + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just 1 and be (a little more) explicit in the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this was clearer.
| each of the actor's methods. | ||
| actor_creation_resources: The resources required by the actor creation | ||
| task. | ||
| actor_method_cpus: The number of CPUs required by each actor method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an int or a list? From the code, it seems like it's just an int, but this doc sounds like it is a number per each method
| def remote_decorator(func_or_class): | ||
| if inspect.isfunction(func_or_class) or is_cython(func_or_class): | ||
| # Set the remote function default resources. | ||
| resources["CPU"] = (DEFAULT_REMOTE_FUNCTION_CPUS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if someone does num_gpus=2, custom_resources={"GPU": 1} custom GPUs will be overwritten?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you include "GPU" in resources= then it will raise an exception. That can only be specified through num_gpus=.
| DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE = 0 | ||
| # Default resource requirements for actors when some resource requirements are | ||
| # specified. | ||
| DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this just mean when any actor method is invoked, it will not ask for more cpus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct (in the case where some resource requirements are specified in the actor decorator).
| if num_cpus is None and num_gpus is None and resources == {}: | ||
| # In the default case, actors acquire no resources for | ||
| # their lifetime, and actor methods will require 1 CPU. | ||
| resources["CPU"] = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also another dumb q, what happens when resources["GPU"] is not specified, as is the case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it is not included in the resource dict (which is the same as having a value of 0).
|
Test FAILed. |
|
retest this please |
|
Test PASSed. |
|
Test PASSed. |
pcmoritz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice job :)
stephanie-wang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo!
This reverts commit 96913be.
This reverts commit 96913be.
This PR treats actor creation like a regular task. That is, when we attempt to create an actor, an actor creation task is scheduled and executed (and when it executes on a worker it turns that worker into an actor).
#1351 was an earlier attempt at this.
Changes to actor resource handling:
Remaining TODO:
give_task_to_local_scheduler_retryas described in [WIP] Schedule actor creation like a regular task. #1351 (comment).Make sure only "unused" workers are turned into actors(decided to hold off on this for now).e.g., always send actor creation tasks to the global scheduler(decided not to do this)Incidentally, this should fix #1716.