Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VCE use Message and Flower callable #2783

Merged
merged 43 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f271ac7
removed old (unused) clientproxy
jafermarq Jan 10, 2024
bfdf6d4
works
jafermarq Jan 10, 2024
fbad2d6
format
jafermarq Jan 10, 2024
ae07103
ensure Client type returned
jafermarq Jan 10, 2024
70a667a
merge (reset) w/ main
jafermarq Jan 29, 2024
cc9f526
remove unused
jafermarq Jan 29, 2024
36a73dc
w/ Message and Flower callable
jafermarq Jan 29, 2024
9a3c23c
w/ previous
jafermarq Jan 29, 2024
adb4c98
rewrapping client_fn
jafermarq Feb 6, 2024
9828570
wip
jafermarq Feb 6, 2024
046ac41
node_id is of type int
jafermarq Feb 7, 2024
b15ba35
leave `node_id` unused
jafermarq Feb 7, 2024
893d9c9
Merge branch 'add-node-id-to-metadata' into vce-with-taskin-taskres
jafermarq Feb 7, 2024
48914d8
integration
jafermarq Feb 7, 2024
5e8ed70
updated docstrings and tests
jafermarq Feb 7, 2024
3eb115f
wip
jafermarq Feb 8, 2024
87fb2ba
fix
jafermarq Feb 8, 2024
12db978
Merge branch 'main' into add-node-id-to-metadata
jafermarq Feb 8, 2024
daaf39d
Merge branch 'add-node-id-to-metadata' into vce-with-taskin-taskres
jafermarq Feb 8, 2024
4ef8907
Merge branch 'main' into add-node-id-to-metadata
jafermarq Feb 9, 2024
54eae37
Merge branch 'main' into add-node-id-to-metadata
jafermarq Feb 9, 2024
440d01f
Merge branch 'main' into add-node-id-to-metadata
jafermarq Feb 11, 2024
6435638
Merge branch 'add-node-id-to-metadata' into vce-with-taskin-taskres
jafermarq Feb 11, 2024
ce291bf
Merge branch 'main' into add-node-id-to-metadata
panh99 Feb 12, 2024
89f232c
add default values for Metadata and Message
panh99 Feb 12, 2024
09f4e33
Merge branch 'main' into add-node-id-to-metadata
jafermarq Feb 12, 2024
38f1165
fix
jafermarq Feb 12, 2024
8a478b8
Merge branch 'add-node-id-to-metadata' into vce-with-taskin-taskres
jafermarq Feb 12, 2024
30f7453
update w/ `content` and `flowerapp`
jafermarq Feb 12, 2024
9fb9754
merge w/ main
jafermarq Feb 13, 2024
950f0ac
Merge branch 'main' into vce-with-taskin-taskres
jafermarq Feb 14, 2024
afead1d
Merge branch 'main' into vce-with-taskin-taskres
jafermarq Feb 14, 2024
5122793
fix pandas e2e
jafermarq Feb 14, 2024
d4780aa
merge
jafermarq Feb 14, 2024
d1e71b1
.
jafermarq Feb 14, 2024
31f4ded
updated docs; removed unused utility function
jafermarq Feb 14, 2024
87f0041
format
jafermarq Feb 14, 2024
794cd24
Apply suggestions from code review
jafermarq Feb 14, 2024
812fe54
timout->ttl; renaming; other minor fixes
jafermarq Feb 14, 2024
a977a62
renamed DefaultActor
jafermarq Feb 14, 2024
fa51ced
Merge branch 'main' into vce-with-taskin-taskres
jafermarq Feb 16, 2024
ca79c3f
Merge branch 'main' into vce-with-taskin-taskres
jafermarq Feb 16, 2024
d2a8a0c
Merge branch 'main' into vce-with-taskin-taskres
danieljanes Feb 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/how-to-run-simulations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Running Flower simulations still require you to define your client class, a stra

def client_fn(cid: str):
# Return a standard Flower client
return MyFlowerClient()
return MyFlowerClient().to_client()

# Launch the simulation
hist = fl.simulation.start_simulation(
Expand Down
6 changes: 1 addition & 5 deletions e2e/pandas/simulation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import flwr as fl

from client import FlowerClient
from client import client_fn
from strategy import FedAnalytics

def client_fn(cid):
_ = cid
return FlowerClient()

hist = fl.simulation.start_simulation(
client_fn=client_fn,
num_clients=2,
Expand Down
2 changes: 1 addition & 1 deletion src/py/flwr/client/message_handler/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def handle_legacy_message_from_msgtype(
client_fn: ClientFn, message: Message, context: Context
) -> Message:
"""Handle legacy message in the inner most mod."""
client = client_fn("-1")
client = client_fn(str(message.metadata.node_id))

client.set_context(context)

Expand Down
8 changes: 4 additions & 4 deletions src/py/flwr/simulation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from flwr.server.server_config import ServerConfig
from flwr.server.strategy import Strategy
from flwr.simulation.ray_transport.ray_actor import (
DefaultActor,
ClientAppActor,
VirtualClientEngineActor,
VirtualClientEngineActorPool,
pool_size_from_resources,
Expand Down Expand Up @@ -83,7 +83,7 @@ def start_simulation(
client_manager: Optional[ClientManager] = None,
ray_init_args: Optional[Dict[str, Any]] = None,
keep_initialised: Optional[bool] = False,
actor_type: Type[VirtualClientEngineActor] = DefaultActor,
actor_type: Type[VirtualClientEngineActor] = ClientAppActor,
actor_kwargs: Optional[Dict[str, Any]] = None,
actor_scheduling: Union[str, NodeAffinitySchedulingStrategy] = "DEFAULT",
) -> History:
Expand Down Expand Up @@ -139,10 +139,10 @@ def start_simulation(
keep_initialised: Optional[bool] (default: False)
Set to True to prevent `ray.shutdown()` in case `ray.is_initialized()=True`.

actor_type: VirtualClientEngineActor (default: DefaultActor)
actor_type: VirtualClientEngineActor (default: ClientAppActor)
Optionally specify the type of actor to use. The actor object, which
persists throughout the simulation, will be the process in charge of
running the clients' jobs (i.e. their `fit()` method).
executing a ClientApp wrapping input argument `client_fn`.

actor_kwargs: Optional[Dict[str, Any]] (default: None)
If you want to create your own Actor classes, you might need to pass
Expand Down
64 changes: 28 additions & 36 deletions src/py/flwr/simulation/ray_transport/ray_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,12 @@
from ray import ObjectRef
from ray.util.actor_pool import ActorPool

from flwr import common
from flwr.client import Client, ClientFn
from flwr.client.clientapp import ClientApp
from flwr.common.context import Context
from flwr.common.logger import log
from flwr.simulation.ray_transport.utils import check_clientfn_returns_client
from flwr.common.message import Message

# All possible returns by a client
ClientRes = Union[
common.GetPropertiesRes, common.GetParametersRes, common.FitRes, common.EvaluateRes
]
# A function to be executed by a client to obtain some results
JobFn = Callable[[Client], ClientRes]
ClientAppFn = Callable[[], ClientApp]


class ClientException(Exception):
Expand All @@ -58,27 +52,25 @@ def terminate(self) -> None:

def run(
self,
client_fn: ClientFn,
job_fn: JobFn,
client_app_fn: ClientAppFn,
message: Message,
cid: str,
context: Context,
) -> Tuple[str, ClientRes, Context]:
) -> Tuple[str, Message, Context]:
"""Run a client run."""
# Execute tasks and return result
# Pass message through ClientApp and return a message
# return also cid which is needed to ensure results
# from the pool are correctly assigned to each ClientProxy
try:
# Instantiate client (check 'Client' type is returned)
client = check_clientfn_returns_client(client_fn(cid))
# Inject context
client.set_context(context)
# Run client job
job_results = job_fn(client)
# Retrieve context (potentially updated)
updated_context = client.get_context()
# Load app
app: ClientApp = client_app_fn()

# Handle task message
out_message = app(message=message, context=context)

except Exception as ex:
client_trace = traceback.format_exc()
message = (
mssg = (
"\n\tSomething went wrong when running your client run."
"\n\tClient "
+ cid
Expand All @@ -87,13 +79,13 @@ def run(
+ " was running its run."
"\n\tException triggered on the client side: " + client_trace,
)
raise ClientException(str(message)) from ex
raise ClientException(str(mssg)) from ex

return cid, job_results, updated_context
return cid, out_message, context


@ray.remote
class DefaultActor(VirtualClientEngineActor):
class ClientAppActor(VirtualClientEngineActor):
"""A Ray Actor class that runs client runs.

Parameters
Expand Down Expand Up @@ -237,16 +229,16 @@ def add_actors_to_pool(self, num_actors: int) -> None:
self._idle_actors.extend(new_actors)
self.num_actors += num_actors

def submit(self, fn: Any, value: Tuple[ClientFn, JobFn, str, Context]) -> None:
"""Take idle actor and assign it a client run.
def submit(self, fn: Any, value: Tuple[ClientAppFn, Message, str, Context]) -> None:
"""Take an idle actor and assign it to run a client app and Message.

Submit a job to an actor by first removing it from the list of idle actors, then
check if this actor was flagged to be removed from the pool
check if this actor was flagged to be removed from the pool.
"""
client_fn, job_fn, cid, context = value
app_fn, mssg, cid, context = value
actor = self._idle_actors.pop()
if self._check_and_remove_actor_from_pool(actor):
future = fn(actor, client_fn, job_fn, cid, context)
future = fn(actor, app_fn, mssg, cid, context)
future_key = tuple(future) if isinstance(future, List) else future
self._future_to_actor[future_key] = (self._next_task_index, actor, cid)
self._next_task_index += 1
Expand All @@ -255,7 +247,7 @@ def submit(self, fn: Any, value: Tuple[ClientFn, JobFn, str, Context]) -> None:
self._cid_to_future[cid]["future"] = future_key

def submit_client_job(
self, actor_fn: Any, job: Tuple[ClientFn, JobFn, str, Context]
self, actor_fn: Any, job: Tuple[ClientAppFn, Message, str, Context]
) -> None:
"""Submit a job while tracking client ids."""
_, _, cid, _ = job
Expand Down Expand Up @@ -295,17 +287,17 @@ def _is_future_ready(self, cid: str) -> bool:

return self._cid_to_future[cid]["ready"] # type: ignore

def _fetch_future_result(self, cid: str) -> Tuple[ClientRes, Context]:
def _fetch_future_result(self, cid: str) -> Tuple[Message, Context]:
"""Fetch result and updated context for a VirtualClient from Object Store.

The job submitted by the ClientProxy interfacing with client with cid=cid is
ready. Here we fetch it from the object store and return.
"""
try:
future: ObjectRef[Any] = self._cid_to_future[cid]["future"] # type: ignore
res_cid, res, updated_context = ray.get(
res_cid, out_mssg, updated_context = ray.get(
future
) # type: (str, ClientRes, Context)
) # type: (str, Message, Context)
except ray.exceptions.RayActorError as ex:
log(ERROR, ex)
if hasattr(ex, "actor_id"):
Expand All @@ -322,7 +314,7 @@ def _fetch_future_result(self, cid: str) -> Tuple[ClientRes, Context]:
# Reset mapping
self._reset_cid_to_future_dict(cid)

return res, updated_context
return out_mssg, updated_context

def _flag_actor_for_removal(self, actor_id_hex: str) -> None:
"""Flag actor that should be removed from pool."""
Expand Down Expand Up @@ -409,7 +401,7 @@ def process_unordered_future(self, timeout: Optional[float] = None) -> None:

def get_client_result(
self, cid: str, timeout: Optional[float]
) -> Tuple[ClientRes, Context]:
) -> Tuple[Message, Context]:
"""Get result from VirtualClient with specific cid."""
# Loop until all jobs submitted to the pool are completed. Break early
# if the result for the ClientProxy calling this method is ready
Expand Down
Loading
Loading