Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add strict FAB execution on SuperNode #4451

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
connection: [secure, insecure]
engine: [deployment-engine, simulation-engine]
authentication: [no-auth, client-auth]
fab-install: [manual, auto]
exclude:
- connection: insecure
authentication: client-auth
Expand All @@ -71,7 +72,8 @@ jobs:
Python ${{ matrix.python-version }} /
${{ matrix.connection }} /
${{ matrix.authentication }} /
${{ matrix.engine }}
${{ matrix.engine }} /
${{ matrix.fab-install }}
defaults:
run:
working-directory: e2e/${{ matrix.directory }}
Expand Down Expand Up @@ -107,7 +109,7 @@ jobs:
${{ matrix.authentication }} /
${{ matrix.engine }}
working-directory: e2e/${{ matrix.directory }}
run: ./../test_exec_api.sh "${{ matrix.connection }}" "${{ matrix.authentication}}" "${{ matrix.engine }}"
run: ./../test_exec_api.sh "${{ matrix.connection }}" "${{ matrix.authentication}}" "${{ matrix.engine }}" "${{ matrix.fab-install }}"

frameworks:
runs-on: ubuntu-22.04
Expand Down
15 changes: 13 additions & 2 deletions e2e/test_exec_api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ case "$3" in
;;
esac

# Set FAB install mode
case $4 in
manual)
fab_install_arg="preinstall"
;;
auto)
fab_install_arg="autoinstall"
;;
esac

# Create and install Flower app
flwr new e2e-tmp-test --framework numpy --username flwrlabs
Expand Down Expand Up @@ -84,13 +93,15 @@ sleep 2
if [ "$3" = "deployment-engine" ]; then
timeout 2m flower-supernode ./ $client_arg \
--superlink $server_address $client_auth_1 \
--node-config "partition-id=0 num-partitions=2" --max-retries 0 &
--node-config "partition-id=0 num-partitions=2" --max-retries 0 \
--fab-install-mode $fab_install_arg &
cl1_pid=$!
sleep 2

timeout 2m flower-supernode ./ $client_arg \
--superlink $server_address $client_auth_2 \
--node-config "partition-id=1 num-partitions=2" --max-retries 0 &
--node-config "partition-id=1 num-partitions=2" --max-retries 0 \
--fab-install-mode $fab_install_arg &
cl2_pid=$!
sleep 2
fi
Expand Down
18 changes: 9 additions & 9 deletions src/py/flwr/cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ def build(
)
raise typer.Exit(code=1)

if not is_valid_project_name(app.name):
typer.secho(
f"❌ The project name {app.name} is invalid, "
"a valid project name must start with a letter, "
"and can only contain letters, digits, and hyphens.",
fg=typer.colors.RED,
bold=True,
)
raise typer.Exit(code=1)
# if not is_valid_project_name(app.name):
# typer.secho(
# f"❌ The project name {app.name} is invalid, "
# "a valid project name must start with a letter, "
# "and can only contain letters, digits, and hyphens.",
# fg=typer.colors.RED,
# bold=True,
# )
# raise typer.Exit(code=1)

conf, errors, warnings = load_and_validate(app / "pyproject.toml")
if conf is None:
Expand Down
44 changes: 41 additions & 3 deletions src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import time
from contextlib import AbstractContextManager
from dataclasses import dataclass
from logging import ERROR, INFO, WARN
from logging import DEBUG, ERROR, INFO, WARN
from pathlib import Path
from typing import Callable, Optional, Union, cast

import grpc
from cryptography.hazmat.primitives.asymmetric import ec
from grpc import RpcError

from flwr.cli.build import build
from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.client.client import Client
Expand All @@ -36,8 +37,10 @@
from flwr.client.typing import ClientFnExt
from flwr.common import GRPC_MAX_MESSAGE_LENGTH, Context, EventType, Message, event
from flwr.common.address import parse_address
from flwr.common.config import get_project_dir_from_hash
from flwr.common.constant import (
CLIENTAPPIO_API_DEFAULT_ADDRESS,
FAB_MODE_PREINSTALL,
ISOLATION_MODE_PROCESS,
ISOLATION_MODE_SUBPROCESS,
MISSING_EXTRA_REST,
Expand Down Expand Up @@ -217,6 +220,7 @@ def start_client_internal(
flwr_path: Optional[Path] = None,
isolation: Optional[str] = None,
supernode_address: Optional[str] = CLIENTAPPIO_API_DEFAULT_ADDRESS,
fab_install_mode: Optional[str] = "autoinstall",
) -> None:
"""Start a Flower client node which connects to a Flower server.

Expand Down Expand Up @@ -278,6 +282,11 @@ class `flwr.client.Client` (default: None)
process and communicates using gRPC at the address `supernode_address`.
supernode_address : Optional[str] (default: `CLIENTAPPIO_API_DEFAULT_ADDRESS`)
The SuperNode gRPC server address.
fab_install_mode: Optional[str] (default: "autoinstall")
FAB install mode for SuperNode. Possible values are `autoinstall` and
`preinstall`. Defaults to `autoinstall`, which installs the FAB automatically
when the SuperNode receives the FAB. If `preinstall`, the FAB must be installed
before the SuperNode receives it.
"""
if insecure is None:
insecure = root_certificates is None
Expand Down Expand Up @@ -451,12 +460,41 @@ def _on_backoff(retry_state: RetryState) -> None:
runs[run_id] = Run(run_id, "", "", "", {})

run: Run = runs[run_id]
fab: Optional[Fab]
if get_fab is not None and run.fab_hash:
fab = get_fab(run.fab_hash)
if fab_install_mode == FAB_MODE_PREINSTALL:
log(
DEBUG,
"SuperNode running in `%s` mode, "
"retrieving FAB with hash %s from %s",
FAB_MODE_PREINSTALL,
run.fab_hash,
flwr_path,
)
installed_fab_path = get_project_dir_from_hash(
run.fab_hash, flwr_path
)
# Get content from installed FAB and create Fab instance
log(INFO, installed_fab_path)
fab_path, fab_hash = build(installed_fab_path)
fab_content = Path(fab_path).read_bytes()
# fab_content = Path(
# f"{str(installed_fab_path)}.fab"
# ).read_bytes()
if fab_hash != run.fab_hash:
raise ValueError(
f"FAB hash mismatch. Expected: {run.fab_hash}, "
f"Found: {fab_hash}"
)
fab = Fab(run.fab_hash, fab_content)
else:
# Retrieve FAB from SuperLink
fab = get_fab(run.fab_hash)
fab_id, fab_version = get_fab_metadata(fab.content)
if not isolation:
# If `ClientApp` runs in the same process, install the FAB
install_from_fab(fab.content, flwr_path, True)
fab_id, fab_version = get_fab_metadata(fab.content)

else:
fab = None
fab_id, fab_version = run.fab_id, run.fab_version
Expand Down
17 changes: 17 additions & 0 deletions src/py/flwr/client/supernode/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from flwr.common import EventType, event
from flwr.common.config import parse_config_args
from flwr.common.constant import (
FAB_MODE_AUTOINSTALL,
FAB_MODE_PREINSTALL,
FLEET_API_GRPC_RERE_DEFAULT_ADDRESS,
ISOLATION_MODE_PROCESS,
ISOLATION_MODE_SUBPROCESS,
Expand Down Expand Up @@ -87,6 +89,7 @@ def run_supernode() -> None:
flwr_path=args.flwr_dir,
isolation=args.isolation,
supernode_address=args.supernode_address,
fab_install_mode=args.fab_install_mode,
)

# Graceful shutdown
Expand Down Expand Up @@ -209,6 +212,20 @@ def _parse_args_run_supernode() -> argparse.ArgumentParser:
default="0.0.0.0:9094",
help="Set the SuperNode gRPC server address. Defaults to `0.0.0.0:9094`.",
)
parser.add_argument(
"--fab-install-mode",
default=FAB_MODE_AUTOINSTALL,
required=False,
choices=[
FAB_MODE_AUTOINSTALL,
FAB_MODE_PREINSTALL,
],
help="Set the FAB install mode when running a `ClientApp` (optional, possible "
"values: `autoinstall`, `preinstall`). By default, the FAB install mode is "
"`autoinstall`. Use `autoinstall` to automatically install the FAB when the "
"SuperNode starts. Use `preinstall` to indicate that the FAB is already "
"installed and the SuperNode should not attempt to install it.",
)

return parser

Expand Down
15 changes: 15 additions & 0 deletions src/py/flwr/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ def get_project_dir(
)


def get_project_dir_from_hash(
fab_hash: str,
flwr_dir: Optional[Union[str, Path]] = None,
) -> Path:
"""Return the project directory based on the given fab_hash."""
if len(fab_hash) != 64:
raise ValueError(f"Invalid FAB hash: {fab_hash}")
if flwr_dir is None:
flwr_dir = get_flwr_dir()
for subfolder in Path(flwr_dir).glob(f"{APP_DIR}/*"):
if fab_hash.startswith(subfolder.name.split(".")[-1]):
return subfolder
raise FileNotFoundError(f"Cannot find project directory for FAB hash: {fab_hash}")


def get_project_config(project_dir: Union[str, Path]) -> dict[str, Any]:
"""Return pyproject.toml in the given project directory."""
# Load pyproject.toml file
Expand Down
4 changes: 4 additions & 0 deletions src/py/flwr/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
LOG_STREAM_INTERVAL = 0.5 # Log stream interval for `ExecServicer.StreamLogs`
LOG_UPLOAD_INTERVAL = 0.2 # Minimum interval between two log uploads

# FAB install mode
FAB_MODE_AUTOINSTALL = "autoinstall"
FAB_MODE_PREINSTALL = "preinstall"


class MessageType:
"""Message type."""
Expand Down
Loading