Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(framework) Remove RunTracker and logs thread from Executor #4391

Merged
merged 26 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e64d1bd
init
jafermarq Oct 24, 2024
c499f85
Merge branch 'main' into use-flwr-serverapp
jafermarq Oct 25, 2024
28e5272
not changing run status in scheduler thread
jafermarq Oct 25, 2024
df2c98e
making use of serverappinputs/outputs in flwr-serverapp
jafermarq Oct 25, 2024
cec3594
updates
jafermarq Oct 25, 2024
51001ce
w/ prev
jafermarq Oct 25, 2024
dc4f59d
tweak
jafermarq Oct 25, 2024
8aca04c
register run in exec for deployment
jafermarq Oct 25, 2024
d2ebc20
Merge branch 'main' into use-flwr-serverapp
jafermarq Oct 25, 2024
0669d2a
formatting tweaks
jafermarq Oct 25, 2024
2fc0126
w/ prev
jafermarq Oct 25, 2024
6862355
Merge branch 'main' into use-flwr-serverapp
jafermarq Oct 28, 2024
a19bde5
passing SSL certificates to `flwr-serverapp` when available
jafermarq Oct 28, 2024
51c98a5
w/ prev
jafermarq Oct 28, 2024
9b58a93
Merge branch 'main' into use-flwr-serverapp
jafermarq Oct 28, 2024
bb3a88d
Apply suggestions from code review
jafermarq Oct 28, 2024
475a066
Merge branch 'main' into use-flwr-serverapp
jafermarq Oct 28, 2024
dfbcaab
fixes
jafermarq Oct 28, 2024
92d33f4
merge fix
jafermarq Oct 28, 2024
40249e3
smaller diff
jafermarq Oct 29, 2024
08d5d31
init
jafermarq Oct 29, 2024
df8207d
fix to test
jafermarq Oct 29, 2024
a4e451d
w/ prev
jafermarq Oct 29, 2024
9d50ee7
pre merge
jafermarq Oct 29, 2024
b1fb045
Merge branch 'main' into remove-run-tracker-and-log-thread
jafermarq Oct 29, 2024
3c1d3f9
Merge branch 'main' into remove-run-tracker-and-log-thread
danieljanes Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkState, LinkStateFactory

from .executor import Executor, RunTracker
from .executor import Executor


class DeploymentEngine(Executor):
Expand Down Expand Up @@ -141,7 +141,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start run using the Flower Deployment Engine."""
try:

Expand All @@ -151,7 +151,7 @@ def start_run(
)
log(INFO, "Created run %s", str(run_id))

return None
return run_id
# pylint: disable-next=broad-except
except Exception as e:
log(ERROR, "Could not start run: %s", str(e))
Expand Down
77 changes: 5 additions & 72 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
"""SuperExec API servicer."""


import select
import sys
import threading
import time
from collections.abc import Generator
from logging import ERROR, INFO
from typing import Any
Expand All @@ -37,7 +33,7 @@
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkStateFactory

from .executor import Executor, RunTracker
from .executor import Executor

SELECT_TIMEOUT = 1 # Timeout for selecting ready-to-read file descriptors (in seconds)

Expand All @@ -55,92 +51,29 @@ def __init__(
self.ffs_factory = ffs_factory
self.executor = executor
self.executor.initialize(linkstate_factory, ffs_factory)
self.runs: dict[int, RunTracker] = {}

def StartRun(
self, request: StartRunRequest, context: grpc.ServicerContext
) -> StartRunResponse:
"""Create run ID."""
log(INFO, "ExecServicer.StartRun")

run = self.executor.start_run(
run_id = self.executor.start_run(
request.fab.content,
user_config_from_proto(request.override_config),
user_config_from_proto(request.federation_config),
)

if run is None:
if run_id is None:
log(ERROR, "Executor failed to start run")
return StartRunResponse()

self.runs[run.run_id] = run

# Start a background thread to capture the log output
capture_thread = threading.Thread(
target=_capture_logs, args=(run,), daemon=True
)
capture_thread.start()

return StartRunResponse(run_id=run.run_id)
return StartRunResponse(run_id=run_id)

def StreamLogs( # pylint: disable=C0103
self, request: StreamLogsRequest, context: grpc.ServicerContext
) -> Generator[StreamLogsResponse, Any, None]:
"""Get logs."""
log(INFO, "ExecServicer.StreamLogs")

# Exit if `run_id` not found
if request.run_id not in self.runs:
context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found")

last_sent_index = 0
while context.is_active():
# Yield n'th row of logs, if n'th row < len(logs)
logs = self.runs[request.run_id].logs
for i in range(last_sent_index, len(logs)):
yield StreamLogsResponse(log_output=logs[i])
last_sent_index = len(logs)

# Wait for and continue to yield more log responses only if the
# run isn't completed yet. If the run is finished, the entire log
# is returned at this point and the server ends the stream.
if self.runs[request.run_id].proc.poll() is not None:
log(INFO, "All logs for run ID `%s` returned", request.run_id)
context.set_code(grpc.StatusCode.OK)
context.cancel()

time.sleep(1.0) # Sleep briefly to avoid busy waiting


def _capture_logs(
run: RunTracker,
) -> None:
while True:
# Explicitly check if Popen.poll() is None. Required for `pytest`.
if run.proc.poll() is None:
# Select streams only when ready to read
ready_to_read, _, _ = select.select(
[run.proc.stdout, run.proc.stderr],
[],
[],
SELECT_TIMEOUT,
)
# Read from std* and append to RunTracker.logs
for stream in ready_to_read:
# Flush stdout to view output in real time
readline = stream.readline()
sys.stdout.write(readline)
sys.stdout.flush()
# Append to logs
line = readline.rstrip()
if line:
run.logs.append(f"{line}")

# Close std* to prevent blocking
elif run.proc.poll() is not None:
log(INFO, "Subprocess finished, exiting log capture")
if run.proc.stdout:
run.proc.stdout.close()
if run.proc.stderr:
run.proc.stderr.close()
break
yield StreamLogsResponse()
22 changes: 2 additions & 20 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611

from .exec_servicer import ExecServicer, _capture_logs
from .exec_servicer import ExecServicer


def test_start_run() -> None:
Expand All @@ -36,7 +36,7 @@ def test_start_run() -> None:
run_res.proc = proc

executor = MagicMock()
executor.start_run = lambda _, __, ___: run_res
executor.start_run = lambda _, __, ___: run_res.run_id

context_mock = MagicMock()

Expand All @@ -48,22 +48,4 @@ def test_start_run() -> None:

# Execute
response = servicer.StartRun(request, context_mock)

assert response.run_id == 10


def test_capture_logs() -> None:
"""Test capture_logs function."""
run_res = Mock()
run_res.logs = []
with subprocess.Popen(
["echo", "success"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
run_res.proc = proc
_capture_logs(run_res)

assert len(run_res.logs) == 1
assert run_res.logs[0] == "success"
7 changes: 3 additions & 4 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start a run using the given Flower FAB ID and version.

This method creates a new run on the SuperLink, returns its run_id
Expand All @@ -89,7 +89,6 @@ def start_run(

Returns
-------
run_id : Optional[RunTracker]
The run_id and the associated process of the run created by the SuperLink,
or `None` if it fails.
run_id : Optional[int]
The run_id of the run created by the SuperLink, or `None` if it fails.
"""
11 changes: 4 additions & 7 deletions src/py/flwr/superexec/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flwr.server.superlink.linkstate import LinkStateFactory
from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes

from .executor import Executor, RunTracker
from .executor import Executor


def _user_config_to_str(user_config: UserConfig) -> str:
Expand Down Expand Up @@ -125,7 +125,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start run using the Flower Simulation Engine."""
if self.num_supernodes is None:
raise ValueError(
Expand Down Expand Up @@ -199,17 +199,14 @@ def start_run(
command.extend(["--run-config", f"{override_config_str}"])

# Start Simulation
proc = subprocess.Popen( # pylint: disable=consider-using-with
_ = subprocess.Popen( # pylint: disable=consider-using-with
command,
text=True,
)

log(INFO, "Started run %s", str(run_id))

return RunTracker(
run_id=run_id,
proc=proc,
)
return run_id

# pylint: disable-next=broad-except
except Exception as e:
Expand Down