Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add ClientApp process function #3977

Merged
merged 40 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dc1f710
Initial commit
chongshenng Aug 8, 2024
df0ccf8
Merge branch 'add-exec-client-app' into add-multiproc-execution
chongshenng Aug 8, 2024
aef1b3e
Init commit
chongshenng Aug 8, 2024
11ed5cf
Merge branch 'main' into add-exec-client-app
chongshenng Aug 8, 2024
1541f18
Merge branch 'main' into add-exec-client-app
chongshenng Aug 9, 2024
eaed17b
Update internal command to flwr-clientapp
chongshenng Aug 9, 2024
4b32077
Update __init__.py
chongshenng Aug 9, 2024
4e7612a
Merge branch 'main' into add-exec-client-app
chongshenng Aug 9, 2024
a16ab06
Merge branch 'main' into add-exec-client-app
danieljanes Aug 10, 2024
ed64df1
Update src/py/flwr/client/supernode/app.py
danieljanes Aug 10, 2024
1a059ee
Update src/py/flwr/client/supernode/app.py
danieljanes Aug 10, 2024
7c16d32
Merge branch 'main' into add-exec-client-app
chongshenng Aug 11, 2024
e269e2e
Address comments
chongshenng Aug 11, 2024
e3ce501
Merge main
chongshenng Aug 12, 2024
b8c4bc1
Update PR
chongshenng Aug 12, 2024
57d0613
Update PR
chongshenng Aug 12, 2024
d6b5b5d
Address comments
chongshenng Aug 12, 2024
27f2e60
Merge main
chongshenng Aug 13, 2024
5cf506f
Merge branch 'main' into add-multiproc-execution
chongshenng Aug 13, 2024
5d30a7c
Refactor code for tests
chongshenng Aug 13, 2024
18f04e0
Update docstring
chongshenng Aug 13, 2024
808c2fb
Merge main
chongshenng Aug 13, 2024
8925e0b
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 15, 2024
1803bab
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 15, 2024
81c2a83
Fix missing import
chongshenng Aug 15, 2024
3ae8021
Fix return type
chongshenng Aug 15, 2024
91cce4e
Change import
chongshenng Aug 15, 2024
1a48975
fix
chongshenng Aug 15, 2024
cd3b888
Set address
chongshenng Aug 15, 2024
ea3956a
Reorder
chongshenng Aug 15, 2024
f6b9f85
Update src/py/flwr/client/app.py
danieljanes Aug 15, 2024
2a15782
Add exception handling
chongshenng Aug 16, 2024
d3eac24
Merge branch 'main' into add-multiproc-execution
chongshenng Aug 16, 2024
42a0336
Lint
chongshenng Aug 16, 2024
13f557f
add
chongshenng Aug 16, 2024
1d514af
Remove background from name
chongshenng Aug 16, 2024
08153ac
Move on_channel_state_change to top-level
chongshenng Aug 16, 2024
0ea3494
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 16, 2024
7f13cdf
Lint
chongshenng Aug 16, 2024
918e1aa
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pathlib import Path
from typing import Callable, ContextManager, Dict, Optional, Tuple, Type, Union

import grpc
from cryptography.hazmat.primitives.asymmetric import ec
from grpc import RpcError

Expand All @@ -43,13 +44,18 @@
from flwr.common.message import Error
from flwr.common.retry_invoker import RetryInvoker, RetryState, exponential
from flwr.common.typing import Fab, Run, UserConfig
from flwr.proto.clientappio_pb2_grpc import add_ClientAppIoServicer_to_server
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server

from .grpc_adapter_client.connection import grpc_adapter
from .grpc_client.connection import grpc_connection
from .grpc_rere_client.connection import grpc_request_response
from .message_handler.message_handler import handle_control_message
from .node_state import NodeState
from .numpy_client import NumPyClient
from .process.clientappio_servicer import ClientAppIoServicer

ADDRESS_CLIENTAPPIO_API_GRPC_RERE = "0.0.0.0:9094"


def _check_actionable_client(
Expand Down Expand Up @@ -667,3 +673,22 @@ def signal_handler(sig, frame): # type: ignore

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)


def run_clientappio_api_grpc(
address: str = ADDRESS_CLIENTAPPIO_API_GRPC_RERE,
) -> Tuple[grpc.Server, grpc.Server]:
"""Run ClientAppIo API gRPC server."""
clientappio_servicer: grpc.Server = ClientAppIoServicer()
clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server
clientappio_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(
clientappio_servicer,
clientappio_add_servicer_to_server_fn,
),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
)
log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address)
clientappio_grpc_server.start()
return clientappio_servicer, clientappio_grpc_server
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
122 changes: 122 additions & 0 deletions src/py/flwr/client/process/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower background ClientApp."""
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

from logging import DEBUG, ERROR, INFO
from typing import Tuple

import grpc

from flwr.client.client_app import ClientApp
from flwr.common import Context, Message
from flwr.common.grpc import create_channel
from flwr.common.logger import log
from flwr.common.serde import (
context_from_proto,
context_to_proto,
message_from_proto,
message_to_proto,
run_from_proto,
)
from flwr.common.typing import Run

# pylint: disable=E0611
from flwr.proto.clientappio_pb2 import (
PullClientAppInputsRequest,
PullClientAppInputsResponse,
PushClientAppOutputsRequest,
PushClientAppOutputsResponse,
)
from flwr.proto.clientappio_pb2_grpc import ClientAppIoStub

from .utils import _get_load_client_app_fn


def _run_background_client( # pylint: disable=R0914
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
address: str,
token: int,
) -> None:
"""Run background Flower ClientApp process.
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
address : str
Address of SuperNode
token : int
Unique SuperNode token for ClientApp-SuperNode authentication
"""

def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

channel = create_channel(
server_address=address,
insecure=True,
)
channel.subscribe(on_channel_state_change)

try:
stub = ClientAppIoStub(channel)

# Pull Message, Context, and Run from SuperNode
message, context, run = pull_message(stub=stub, token=token)
danieljanes marked this conversation as resolved.
Show resolved Hide resolved

load_client_app_fn = _get_load_client_app_fn(
default_app_ref="",
app_path=None,
multi_app=True,
flwr_dir=None,
)

# Load ClientApp
client_app: ClientApp = load_client_app_fn(run.fab_id, run.fab_version)

# Execute ClientApp
reply_message = client_app(message=message, context=context)
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

# Push Message and Context to SuperNode
_ = push_message(token=token, message=reply_message, context=context, stub=stub)
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
except KeyboardInterrupt:
log(INFO, "Closing connection")
except grpc.RpcError as e:
log(ERROR, "GRPC error occurred: %s", str(e))
finally:
channel.close()


def pull_message(stub: grpc.Channel, token: int) -> Tuple[Message, Context, Run]:
"""Pull message from SuperNode to ClientApp."""
res: PullClientAppInputsResponse = stub.PullClientAppInputs(
PullClientAppInputsRequest(token=token)
)
message = message_from_proto(res.message)
context = context_from_proto(res.context)
run = run_from_proto(res.run)
return message, context, run


def push_message(
stub: grpc.Channel, token: int, message: Message, context: Context
) -> PushClientAppOutputsResponse:
"""Push message to SuperNode from ClientApp."""
proto_message = message_to_proto(message)
proto_context = context_to_proto(context)
res: PushClientAppOutputsResponse = stub.PushClientAppOutputs(
PushClientAppOutputsRequest(
token=token, message=proto_message, context=proto_context
)
)
return res
108 changes: 108 additions & 0 deletions src/py/flwr/client/process/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower ClientApp loading utils."""

from logging import DEBUG
from pathlib import Path
from typing import Callable, Optional

from flwr.client.client_app import ClientApp, LoadClientAppError
from flwr.common.config import (
get_flwr_dir,
get_metadata_from_config,
get_project_config,
get_project_dir,
)
from flwr.common.logger import log
from flwr.common.object_ref import load_app, validate


def _get_load_client_app_fn(
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
default_app_ref: str,
app_path: Optional[str],
multi_app: bool,
flwr_dir: Optional[str] = None,
) -> Callable[[str, str], ClientApp]:
"""Get the load_client_app_fn function.

If `multi_app` is True, this function loads the specified ClientApp
based on `fab_id` and `fab_version`. If `fab_id` is empty, a default
ClientApp will be loaded.

If `multi_app` is False, it ignores `fab_id` and `fab_version` and
loads a default ClientApp.
"""
if not multi_app:
log(
DEBUG,
"Flower SuperNode will load and validate ClientApp `%s`",
default_app_ref,
)

valid, error_msg = validate(default_app_ref, project_dir=app_path)
if not valid and error_msg:
raise LoadClientAppError(error_msg) from None

def _load(fab_id: str, fab_version: str) -> ClientApp:
runtime_app_dir = Path(app_path if app_path else "").absolute()
# If multi-app feature is disabled
if not multi_app:
# Set app reference
client_app_ref = default_app_ref
# If multi-app feature is enabled but app directory is provided
elif app_path is not None:
config = get_project_config(runtime_app_dir)
this_fab_version, this_fab_id = get_metadata_from_config(config)

if this_fab_version != fab_version or this_fab_id != fab_id:
raise LoadClientAppError(
f"FAB ID or version mismatch: Expected FAB ID '{this_fab_id}' and "
f"FAB version '{this_fab_version}', but received FAB ID '{fab_id}' "
f"and FAB version '{fab_version}'.",
) from None

# log(WARN, "FAB ID is not provided; the default ClientApp will be loaded.")

# Set app reference
client_app_ref = config["tool"]["flwr"]["app"]["components"]["clientapp"]
# If multi-app feature is enabled
else:
try:
runtime_app_dir = get_project_dir(
fab_id, fab_version, get_flwr_dir(flwr_dir)
)
config = get_project_config(runtime_app_dir)
except Exception as e:
raise LoadClientAppError("Failed to load ClientApp") from e

# Set app reference
client_app_ref = config["tool"]["flwr"]["app"]["components"]["clientapp"]

# Load ClientApp
log(
DEBUG,
"Loading ClientApp `%s`",
client_app_ref,
)
client_app = load_app(client_app_ref, LoadClientAppError, runtime_app_dir)

if not isinstance(client_app, ClientApp):
raise LoadClientAppError(
f"Attribute {client_app_ref} is not of type {ClientApp}",
) from None

return client_app

return _load
Loading