Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Add enter and leave event for Simulation and ServerApp #4716

Merged
merged 20 commits into from
Dec 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add serverapp events
chongshenng committed Dec 19, 2024

Verified

This commit was signed with the committer’s verified signature.
Freyskeyd Simon Paitrault
commit 7898c33611aa2e6601205a278c6eb1eea5d4dc1d
19 changes: 11 additions & 8 deletions src/py/flwr/cli/utils.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
from contextlib import contextmanager
from logging import DEBUG
from pathlib import Path
from typing import Any, Callable, Optional, cast
from typing import Any, Callable, Optional, Union, cast

import grpc
import typer
@@ -148,15 +148,18 @@ def sanitize_project_name(name: str) -> str:
return sanitized_name


def get_sha256_hash(file_path: Path) -> str:
def get_sha256_hash(file_path_or_int: Union[Path | int]) -> str:
"""Calculate the SHA-256 hash of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
data = f.read(65536) # Read in 64kB blocks
if not data:
break
sha256.update(data)
if isinstance(file_path_or_int, Path):
with open(file_path_or_int, "rb") as f:
while True:
data = f.read(65536) # Read in 64kB blocks
if not data:
break
sha256.update(data)
elif isinstance(file_path_or_int, int):
sha256.update(str(file_path_or_int).encode())
return sha256.hexdigest()


4 changes: 4 additions & 0 deletions src/py/flwr/common/telemetry.py
Original file line number Diff line number Diff line change
@@ -175,6 +175,10 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: list[A
RUN_SUPERNODE_ENTER = auto()
RUN_SUPERNODE_LEAVE = auto()

# CLI: flwr-serverapp
RUN_SERVERAPP_ENTER = auto()
RUN_SERVERAPP_LEAVE = auto()

# --- DEPRECATED -------------------------------------------------------------------

# [DEPRECATED] CLI: `flower-server-app`
15 changes: 13 additions & 2 deletions src/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.cli.utils import get_sha256_hash
from flwr.common.args import add_args_flwr_app_common
from flwr.common.config import (
get_flwr_dir,
@@ -51,6 +52,7 @@
run_from_proto,
run_status_to_proto,
)
from flwr.common.telemetry import EventType, event
from flwr.common.typing import RunNotRunningException, RunStatus
from flwr.proto.run_pb2 import UpdateRunStatusRequest # pylint: disable=E0611
from flwr.proto.serverappio_pb2 import ( # pylint: disable=E0611
@@ -113,7 +115,7 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
# Resolve directory where FABs are installed
flwr_dir_ = get_flwr_dir(flwr_dir)
log_uploader = None

success = True
while True:

try:
@@ -129,6 +131,8 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
run = run_from_proto(res.run)
fab = fab_from_proto(res.fab)

hash_run_id = get_sha256_hash(run.run_id)

driver.set_run(run.run_id)

# Start log uploader for this run
@@ -171,6 +175,8 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto)
)

event(EventType.RUN_SERVERAPP_ENTER, event_details={"run-id": hash_run_id})

# Load and run the ServerApp with the Driver
updated_context = run_(
driver=driver,
@@ -187,17 +193,18 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
_ = driver._stub.PushServerAppOutputs(out_req)

run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "")

except RunNotRunningException:
log(INFO, "")
log(INFO, "Run ID %s stopped.", run.run_id)
log(INFO, "")
run_status = None
success = False

except Exception as ex: # pylint: disable=broad-exception-caught
exc_entity = "ServerApp"
log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex))
success = False

finally:
# Stop log uploader for this run and upload final logs
@@ -213,6 +220,10 @@ def run_serverapp( # pylint: disable=R0914, disable=W0212, disable=R0915
run_id=run.run_id, run_status=run_status_proto
)
)
event(
EventType.RUN_SERVERAPP_LEAVE,
event_details={"run-id": hash_run_id, "success": success},
)

# Stop the loop if `flwr-serverapp` is expected to process a single run
if run_once:
4 changes: 4 additions & 0 deletions src/py/flwr/simulation/app.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.cli.utils import get_sha256_hash
from flwr.common import EventType, event
from flwr.common.args import add_args_flwr_app_common
from flwr.common.config import (
@@ -137,6 +138,8 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09
run = run_from_proto(res.run)
fab = fab_from_proto(res.fab)

hash_run_id = get_sha256_hash(run.run_id)

# Start log uploader for this run
log_uploader = start_log_uploader(
log_queue=log_queue,
@@ -207,6 +210,7 @@ def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R09
event_details={
"backend": "ray",
"num-supernodes": num_supernodes,
"run-id": hash_run_id,
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
},
)

6 changes: 5 additions & 1 deletion src/py/flwr/simulation/run_simulation.py
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
from typing import Any, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.cli.utils import get_sha256_hash
from flwr.client import ClientApp
from flwr.common import Context, EventType, RecordSet, event, log, now
from flwr.common.config import get_fused_config_from_dir, parse_config_args
@@ -394,7 +395,10 @@ def _main_loop(
finally:
# Trigger stop event
f_stop.set()
event(exit_event, event_details={"success": success})
event(
exit_event,
event_details={"run-id": get_sha256_hash(run.run_id), "success": success},
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
)
if serverapp_th:
serverapp_th.join()
if server_app_thread_has_exception.is_set():