Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add ClientApp multiprocess execution #3978

Merged
merged 25 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Flower client app."""

import signal
import subprocess
import sys
import time
from dataclasses import dataclass
Expand All @@ -35,6 +36,7 @@
from flwr.common.address import parse_address
from flwr.common.constant import (
MISSING_EXTRA_REST,
RUN_ID_NUM_BYTES,
TRANSPORT_TYPE_GRPC_ADAPTER,
TRANSPORT_TYPE_GRPC_BIDI,
TRANSPORT_TYPE_GRPC_RERE,
Expand All @@ -48,14 +50,15 @@
from flwr.common.typing import Fab, Run, UserConfig
from flwr.proto.clientappio_pb2_grpc import add_ClientAppIoServicer_to_server
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server
from flwr.server.superlink.state.utils import generate_rand_int_from_bytes

from .grpc_adapter_client.connection import grpc_adapter
from .grpc_client.connection import grpc_connection
from .grpc_rere_client.connection import grpc_request_response
from .message_handler.message_handler import handle_control_message
from .node_state import NodeState
from .numpy_client import NumPyClient
from .process.clientappio_servicer import ClientAppIoServicer
from .process.clientappio_servicer import ClientAppIoInputs, ClientAppIoServicer

ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094"

Expand Down Expand Up @@ -204,6 +207,8 @@ def start_client_internal(
max_retries: Optional[int] = None,
max_wait_time: Optional[float] = None,
flwr_path: Optional[Path] = None,
isolate: Optional[bool] = False,
supernode_address: Optional[str] = ADDRESS_CLIENTAPPIO_API_GRPC_RERE,
) -> None:
"""Start a Flower client node which connects to a Flower server.

Expand Down Expand Up @@ -251,6 +256,13 @@ class `flwr.client.Client` (default: None)
If set to None, there is no limit to the total time.
flwr_path: Optional[Path] (default: None)
The fully resolved path containing installed Flower Apps.
isolate : Optional[bool] (default: False)
Whether to run `ClientApp` in a separate process. By default, this value is
`False`, and the `ClientApp` runs in the same process as the SuperNode. If
`True`, the `ClientApp` runs in an isolated process and communicates using
gRPC at the address `supernode_address`.
supernode_address : Optional[str] (default: `ADDRESS_CLIENTAPPIO_API_GRPC_RERE`)
The SuperNode gRPC server address.
"""
if insecure is None:
insecure = root_certificates is None
Expand All @@ -276,6 +288,11 @@ def _load_client_app(_1: str, _2: str) -> ClientApp:

load_client_app_fn = _load_client_app

if isolate and supernode_address is not None:
_clientappio_grpc_server, clientappio_servicer = run_clientappio_api_grpc(
address=supernode_address
)
danieljanes marked this conversation as resolved.
Show resolved Hide resolved

# At this point, only `load_client_app_fn` should be used
# Both `client` and `client_fn` must not be used directly

Expand Down Expand Up @@ -435,11 +452,43 @@ def _on_backoff(retry_state: RetryState) -> None:

# Handle app loading and task message
try:
# Load ClientApp instance
client_app: ClientApp = load_client_app_fn(fab_id, fab_version)
if isolate and supernode_address is not None:
# Generate SuperNode token
token: int = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES)
danieljanes marked this conversation as resolved.
Show resolved Hide resolved

# Share Message and Context with servicer
clientappio_servicer.set_inputs(
ClientAppIoInputs(
message=message,
context=context,
run=run,
token=token,
)
)
# Run ClientApp
command = [
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
"flwr-clientapp",
"--supernode",
supernode_address,
"--token",
str(token),
]
subprocess.run(
command,
stdout=None,
stderr=None,
check=True,
)
outputs = clientappio_servicer.get_outputs()
reply_message, context = outputs.message, outputs.context
else:
# Load ClientApp instance
client_app: ClientApp = load_client_app_fn(
fab_id, fab_version
)

# Execute ClientApp
reply_message = client_app(message=message, context=context)
# Execute ClientApp
reply_message = client_app(message=message, context=context)
except Exception as ex: # pylint: disable=broad-exception-caught

# Legacy grpc-bidi
Expand Down Expand Up @@ -685,9 +734,7 @@ def signal_handler(sig, frame): # type: ignore
signal.signal(signal.SIGTERM, signal_handler)


def run_clientappio_api_grpc(
address: str = ADDRESS_CLIENTAPPIO_API_GRPC_RERE,
) -> Tuple[grpc.Server, ClientAppIoServicer]:
def run_clientappio_api_grpc(address: str) -> Tuple[grpc.Server, ClientAppIoServicer]:
"""Run ClientAppIo API gRPC server."""
clientappio_servicer: grpc.Server = ClientAppIoServicer()
clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server
Expand Down
11 changes: 5 additions & 6 deletions src/py/flwr/client/process/clientappio_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ def PushClientAppOutputs(
) -> PushClientAppOutputsResponse:
"""Push Message and Context."""
log(DEBUG, "ClientAppIo.PushClientAppOutputs")
if self.clientapp_output is None:
raise ValueError(
"ClientAppIoOutputs not set before calling `PushClientAppOutputs`."
)
if self.clientapp_input is None:
raise ValueError(
"ClientAppIoInputs not set before calling `PushClientAppOutputs`."
Expand All @@ -109,8 +105,11 @@ def PushClientAppOutputs(
)
try:
# Update Message and Context
self.clientapp_output.message = message_from_proto(request.message)
self.clientapp_output.context = context_from_proto(request.context)
self.clientapp_output = ClientAppIoOutputs(
message=message_from_proto(request.message),
context=context_from_proto(request.context),
)

# Set status
code = typing.ClientAppOutputCode.SUCCESS
status = typing.ClientAppOutputStatus(code=code, message="Success")
Expand Down
6 changes: 3 additions & 3 deletions src/py/flwr/client/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ def on_channel_state_change(channel_connectivity: str) -> None:


def run_clientapp( # pylint: disable=R0914
address: str,
supernode: str,
token: int,
) -> None:
"""Run Flower ClientApp process.

Parameters
----------
address : str
supernode : str
Address of SuperNode
token : int
Unique SuperNode token for ClientApp-SuperNode authentication
"""
channel = create_channel(
server_address=address,
server_address=supernode,
insecure=True,
)
channel.subscribe(on_channel_state_change)
Expand Down
19 changes: 16 additions & 3 deletions src/py/flwr/client/supernode/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def run_supernode() -> None:
max_retries=args.max_retries,
max_wait_time=args.max_wait_time,
node_config=parse_config_args([args.node_config]),
isolate=args.isolate,
supernode_address=args.supernode_address,
)

# Graceful shutdown
Expand Down Expand Up @@ -121,7 +123,7 @@ def flwr_clientapp() -> None:
description="Run a Flower ClientApp",
)
parser.add_argument(
"--address",
"--supernode",
help="Address of SuperNode ClientAppIo gRPC servicer",
)
parser.add_argument(
Expand All @@ -133,10 +135,10 @@ def flwr_clientapp() -> None:
DEBUG,
"Staring isolated `ClientApp` connected to SuperNode ClientAppIo at %s "
"with the token %s",
args.address,
args.supernode,
args.token,
)
run_clientapp(address=args.address, token=int(args.token))
run_clientapp(supernode=args.supernode, token=int(args.token))


def _warn_deprecated_server_arg(args: argparse.Namespace) -> None:
Expand Down Expand Up @@ -223,6 +225,17 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser:
- `$HOME/.flwr/` in all other cases
""",
)
parser.add_argument(
"--isolate",
action="store_true",
help="Run `ClientApp` in an isolated subprocess. By default, `ClientApp` "
"runs in the same process that executes the SuperNode.",
)
parser.add_argument(
"--supernode-address",
default="0.0.0.0:9094",
help="Set the SuperNode gRPC server address. Defaults to `0.0.0.0:9094`.",
)

return parser

Expand Down