Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Added conditional enqueue #4

Open
wants to merge 1 commit into
base: pipeline-related-features
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/experimental/serve/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from ray.experimental.serve.backend_config import BackendConfig
from ray.experimental.serve.policy import RoutePolicy
from ray.experimental.serve.constants import RESULT_KEY, PREDICATE_KEY
if sys.version_info < (3, 0):
raise ImportError("serve is Python 3 only.")

Expand All @@ -10,5 +11,5 @@
__all__ = [
"init", "create_backend", "create_endpoint", "link", "split", "get_handle",
"stat", "set_backend_config", "get_backend_config", "BackendConfig",
"RoutePolicy", "accept_batch"
"RoutePolicy", "accept_batch", "RESULT_KEY", "PREDICATE_KEY"
]
30 changes: 26 additions & 4 deletions python/ray/experimental/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def get_backend_config(backend_tag):
def create_backend(func_or_class,
backend_tag,
*actor_init_args,
backend_config=BackendConfig()):
backend_config=BackendConfig(),
predicate_function=None):
"""Create a backend using func_or_class and assign backend_tag.

Args:
Expand All @@ -216,6 +217,8 @@ def create_backend(func_or_class,
for starting a backend.
*actor_init_args (optional): the argument to pass to the class
initialization method.
predicate_function(callable): a function which returns boolean values
for conditional enqueuing.
"""
assert isinstance(backend_config,
BackendConfig), ("backend_config must be"
Expand All @@ -234,16 +237,26 @@ def create_backend(func_or_class,
if should_accept_batch and not hasattr(func_or_class,
"serve_accept_batch"):
raise batch_annotation_not_found

if backend_config.enable_predicate and predicate_function is None:
raise RayServeException(
"For enabling predicate, Specify predicate_function.")
# arg list for a fn is function itself
arg_list = [func_or_class]
# add predicate function to args
if backend_config.enable_predicate:
arg_list.append(predicate_function)

# ignore lint on lambda expression
creator = lambda kwrgs: TaskRunnerActor._remote(**kwrgs) # noqa: E731
elif inspect.isclass(func_or_class):
if should_accept_batch and not hasattr(func_or_class.__call__,
"serve_accept_batch"):
raise batch_annotation_not_found

if backend_config.enable_predicate and not hasattr(
func_or_class, "__predicate__"):
raise RayServeException(
"For enabling predicate, implement __predicate__ function "
"in backend class.")
# Python inheritance order is right-to-left. We put RayServeMixin
# on the left to make sure its methods are not overriden.
@ray.remote
Expand Down Expand Up @@ -297,7 +310,10 @@ def _start_replica(backend_tag):
# Setup the worker
ray.get(
runner_handle._ray_serve_setup.remote(
backend_tag, global_state.init_or_get_router(), runner_handle))
backend_tag,
global_state.init_or_get_router(),
runner_handle,
predicate_required=backend_config.enable_predicate))
runner_handle._ray_serve_fetch.remote()

# Register the worker in config tables as well as metric monitor
Expand Down Expand Up @@ -392,10 +408,16 @@ def split(endpoint_name, traffic_policy_dictionary):
assert isinstance(traffic_policy_dictionary,
dict), "Traffic policy must be dictionary"
prob = 0
backend_predicates = []
for backend, weight in traffic_policy_dictionary.items():
prob += weight
assert (backend in global_state.backend_table.list_backends()
), "backend {} is not registered".format(backend)
backend_predicates.append(
global_state.backend_table.get_backend_predicate(backend))
assert len(set(backend_predicates)) == 1, ("Provided backends are not"
"consistent wrt to predicate"
"feature")
assert np.isclose(
prob, 1,
atol=0.02), "weights must sum to 1, currently it sums to {}".format(
Expand Down
10 changes: 7 additions & 3 deletions python/ray/experimental/serve/backend_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
class BackendConfig:
# configs not needed for actor creation when
# instantiating a replica
_serve_configs = ["_num_replicas", "max_batch_size"]
_serve_configs = ["_num_replicas", "max_batch_size", "enable_predicate"]

# configs which when changed leads to restarting
# the existing replicas.
restart_on_change_fields = ["resources", "num_cpus", "num_gpus"]
restart_on_change_fields = [
"resources", "num_cpus", "num_gpus", "enable_predicate"
]

def __init__(self,
num_replicas=1,
Expand All @@ -17,7 +19,8 @@ def __init__(self,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None):
object_store_memory=None,
enable_predicate=False):
"""
Class for defining backend configuration.
"""
Expand All @@ -32,6 +35,7 @@ def __init__(self,
self.num_gpus = num_gpus
self.memory = memory
self.object_store_memory = object_store_memory
self.enable_predicate = enable_predicate

@property
def num_replicas(self):
Expand Down
7 changes: 7 additions & 0 deletions python/ray/experimental/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@

#: HTTP Port
DEFAULT_HTTP_PORT = 8000

#: Return ObjectIDs keys for a dictionary
RESULT_KEY = "result"
PREDICATE_KEY = "predicate"

# default value to pass when Enqeueue Predicate is False
PREDICATE_DEFAULT_VALUE = "predicate-false"
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Ray serve conditional pipeline example
"""
import ray
import ray.experimental.serve as serve
from ray.experimental.serve import BackendConfig

# initialize ray serve system.
# blocking=True will wait for HTTP server to be ready to serve request.
serve.init(blocking=True)


# This is an example of conditional backend implementation
def echo_v1(_, num):
return num


def echo_v1_predicate(num):
return num < 0.5


def echo_v2(_, relay=""):
return f"echo_v2({relay})"


def echo_v3(_, relay=""):
return f"echo_v3({relay})"


# an endpoint is associated with an http URL.
serve.create_endpoint("my_endpoint1", "/echo1")
serve.create_endpoint("my_endpoint2", "/echo2")
serve.create_endpoint("my_endpoint3", "/echo3")

# create backends
serve.create_backend(
echo_v1,
"echo:v1",
backend_config=BackendConfig(enable_predicate=True),
predicate_function=echo_v1_predicate)
serve.create_backend(echo_v2, "echo:v2")
serve.create_backend(echo_v3, "echo:v3")

# link service to backends
serve.link("my_endpoint1", "echo:v1")
serve.link("my_endpoint2", "echo:v2")
serve.link("my_endpoint3", "echo:v3")

# get the handle of the endpoints
handle1 = serve.get_handle("my_endpoint1")
handle2 = serve.get_handle("my_endpoint2")
handle3 = serve.get_handle("my_endpoint3")

for number in [0.2, 0.8]:
first_object_id = ray.ObjectID.from_random()
predicate_object_id = ray.ObjectID.from_random()
handle1.remote(
num=number,
return_object_ids={
serve.RESULT_KEY: first_object_id,
serve.PREDICATE_KEY: predicate_object_id
})
second_object_id = ray.ObjectID.from_random()

return_val = handle2.remote(
relay=first_object_id,
predicate_condition=predicate_object_id,
default_value=("kwargs", "relay"),
return_object_ids={serve.RESULT_KEY: second_object_id})

assert return_val is None
result = ray.get(handle3.remote(relay=second_object_id))
print("For number : {} the whole pipeline output is : {}".format(
number, result))
6 changes: 3 additions & 3 deletions python/ray/experimental/serve/examples/echo_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,22 @@ def echo_v4(_, relay1="", relay2=""):
# asynchronous! All the remote calls below are completely asynchronous
temp1 = handle2.remote(
relay=first_object_id,
return_object_ids=[second_object_id],
return_object_ids={serve.RESULT_KEY: second_object_id},
slo_ms=wall_clock_slo,
is_wall_clock_time=True)

# check for this call to be asynchronous
assert temp1 is None
handle3.remote(
relay=first_object_id,
return_object_ids=[third_object_id],
return_object_ids={serve.RESULT_KEY: third_object_id},
slo_ms=wall_clock_slo,
is_wall_clock_time=True)
fourth_object_id = ray.ObjectID.from_random()
temp2 = handle4.remote(
relay1=second_object_id,
relay2=third_object_id,
return_object_ids=[fourth_object_id],
return_object_ids={serve.RESULT_KEY: fourth_object_id},
slo_ms=wall_clock_slo,
is_wall_clock_time=True)
assert temp2 is None
Expand Down
38 changes: 35 additions & 3 deletions python/ray/experimental/serve/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from ray.experimental import serve
from ray.experimental.serve.context import TaskContext
from ray.experimental.serve.exceptions import RayServeException
from ray.experimental.serve.constants import DEFAULT_HTTP_ADDRESS
from ray.experimental.serve.constants import (DEFAULT_HTTP_ADDRESS,
PREDICATE_DEFAULT_VALUE)
from ray.experimental.serve.request_params import RequestParams, RequestInfo


Expand Down Expand Up @@ -40,6 +41,26 @@ def _fix_kwarg_name(self, name):
return "request_slo_ms"
return name

def _check_default_value(self, default_value, len_args, kwargs_keys):
if default_value != PREDICATE_DEFAULT_VALUE:
if not isinstance(default_value, tuple):
raise ValueError("The default value must be a tuple.")
if len(default_value) != 2:
raise ValueError(
"Specify default_value in format: ('args',arg_index)"
" or ('kwargs', kwargs_key)")
val = default_value[0]
if val not in ["args", "kwargs"]:
raise ValueError(
"First value of default_value must be: 'args' or 'kwargs'."
)
if val == "args":
if default_value[1] >= len_args:
raise ValueError("Specify the args index currently!")
else:
if default_value[1] not in kwargs_keys:
raise ValueError("Specify the kwargs key correctly!")

def __init__(self, router_handle, endpoint_name):
self.router_handle = router_handle
self.endpoint_name = endpoint_name
Expand All @@ -65,11 +86,22 @@ def remote(self, *args, **kwargs):
except ValueError as e:
raise RayServeException(str(e))

# create request parameters required for enqueuing the request
# check and pop predicate_condition and default value
# specified while enqueuing
predicate_condition = kwargs.pop("predicate_condition", True)
default_value = kwargs.pop("default_value", PREDICATE_DEFAULT_VALUE)
try:
self._check_default_value(default_value, len(args),
list(kwargs.keys()))
except ValueError as e:
raise RayServeException(str(e))

# create request parameters required for enqueuing the request
request_params = RequestParams(self.endpoint_name, TaskContext.Python,
**request_param_kwargs)
req_info_object_id = self.router_handle.enqueue_request.remote(
request_params, *args, **kwargs)
request_params, predicate_condition, default_value, *args,
**kwargs)

# check if it is necessary to wait for enqueue to be completed
# NOTE: This will make remote call completely non-blocking for
Expand Down
5 changes: 5 additions & 0 deletions python/ray/experimental/serve/kv_store_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ def get_backend_creator(self, backend_tag):
def list_backends(self):
return list(self.backend_table.as_dict().keys())

def get_backend_predicate(self, backend_tag):
backend_info = json.loads(self.backend_info.get(backend_tag, "{}"))
if "enable_predicate" in backend_info:
return backend_info["enable_predicate"]

def list_replicas(self, backend_tag: str):
return json.loads(self.replica_table.get(backend_tag, "[]"))

Expand Down
Loading