Skip to content

Commit

Permalink
refactor(framework) Remove RunTracker and logs thread from `Executo…
Browse files Browse the repository at this point in the history
…r` (#4391)

Co-authored-by: Heng Pan <pan@flower.ai>
  • Loading branch information
jafermarq and panh99 authored Oct 29, 2024
1 parent 183215b commit 7fc4261
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 106 deletions.
6 changes: 3 additions & 3 deletions src/py/flwr/superexec/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkState, LinkStateFactory

from .executor import Executor, RunTracker
from .executor import Executor


class DeploymentEngine(Executor):
Expand Down Expand Up @@ -141,7 +141,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start run using the Flower Deployment Engine."""
try:

Expand All @@ -151,7 +151,7 @@ def start_run(
)
log(INFO, "Created run %s", str(run_id))

return None
return run_id
# pylint: disable-next=broad-except
except Exception as e:
log(ERROR, "Could not start run: %s", str(e))
Expand Down
77 changes: 5 additions & 72 deletions src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
"""SuperExec API servicer."""


import select
import sys
import threading
import time
from collections.abc import Generator
from logging import ERROR, INFO
from typing import Any
Expand All @@ -37,7 +33,7 @@
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkStateFactory

from .executor import Executor, RunTracker
from .executor import Executor

SELECT_TIMEOUT = 1 # Timeout for selecting ready-to-read file descriptors (in seconds)

Expand All @@ -55,92 +51,29 @@ def __init__(
self.ffs_factory = ffs_factory
self.executor = executor
self.executor.initialize(linkstate_factory, ffs_factory)
self.runs: dict[int, RunTracker] = {}

def StartRun(
self, request: StartRunRequest, context: grpc.ServicerContext
) -> StartRunResponse:
"""Create run ID."""
log(INFO, "ExecServicer.StartRun")

run = self.executor.start_run(
run_id = self.executor.start_run(
request.fab.content,
user_config_from_proto(request.override_config),
user_config_from_proto(request.federation_config),
)

if run is None:
if run_id is None:
log(ERROR, "Executor failed to start run")
return StartRunResponse()

self.runs[run.run_id] = run

# Start a background thread to capture the log output
capture_thread = threading.Thread(
target=_capture_logs, args=(run,), daemon=True
)
capture_thread.start()

return StartRunResponse(run_id=run.run_id)
return StartRunResponse(run_id=run_id)

def StreamLogs( # pylint: disable=C0103
self, request: StreamLogsRequest, context: grpc.ServicerContext
) -> Generator[StreamLogsResponse, Any, None]:
"""Get logs."""
log(INFO, "ExecServicer.StreamLogs")

# Exit if `run_id` not found
if request.run_id not in self.runs:
context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found")

last_sent_index = 0
while context.is_active():
# Yield n'th row of logs, if n'th row < len(logs)
logs = self.runs[request.run_id].logs
for i in range(last_sent_index, len(logs)):
yield StreamLogsResponse(log_output=logs[i])
last_sent_index = len(logs)

# Wait for and continue to yield more log responses only if the
# run isn't completed yet. If the run is finished, the entire log
# is returned at this point and the server ends the stream.
if self.runs[request.run_id].proc.poll() is not None:
log(INFO, "All logs for run ID `%s` returned", request.run_id)
context.set_code(grpc.StatusCode.OK)
context.cancel()

time.sleep(1.0) # Sleep briefly to avoid busy waiting


def _capture_logs(
run: RunTracker,
) -> None:
while True:
# Explicitly check if Popen.poll() is None. Required for `pytest`.
if run.proc.poll() is None:
# Select streams only when ready to read
ready_to_read, _, _ = select.select(
[run.proc.stdout, run.proc.stderr],
[],
[],
SELECT_TIMEOUT,
)
# Read from std* and append to RunTracker.logs
for stream in ready_to_read:
# Flush stdout to view output in real time
readline = stream.readline()
sys.stdout.write(readline)
sys.stdout.flush()
# Append to logs
line = readline.rstrip()
if line:
run.logs.append(f"{line}")

# Close std* to prevent blocking
elif run.proc.poll() is not None:
log(INFO, "Subprocess finished, exiting log capture")
if run.proc.stdout:
run.proc.stdout.close()
if run.proc.stderr:
run.proc.stderr.close()
break
yield StreamLogsResponse()
22 changes: 2 additions & 20 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611

from .exec_servicer import ExecServicer, _capture_logs
from .exec_servicer import ExecServicer


def test_start_run() -> None:
Expand All @@ -36,7 +36,7 @@ def test_start_run() -> None:
run_res.proc = proc

executor = MagicMock()
executor.start_run = lambda _, __, ___: run_res
executor.start_run = lambda _, __, ___: run_res.run_id

context_mock = MagicMock()

Expand All @@ -48,22 +48,4 @@ def test_start_run() -> None:

# Execute
response = servicer.StartRun(request, context_mock)

assert response.run_id == 10


def test_capture_logs() -> None:
"""Test capture_logs function."""
run_res = Mock()
run_res.logs = []
with subprocess.Popen(
["echo", "success"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
run_res.proc = proc
_capture_logs(run_res)

assert len(run_res.logs) == 1
assert run_res.logs[0] == "success"
7 changes: 3 additions & 4 deletions src/py/flwr/superexec/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start a run using the given Flower FAB ID and version.
This method creates a new run on the SuperLink, returns its run_id
Expand All @@ -89,7 +89,6 @@ def start_run(
Returns
-------
run_id : Optional[RunTracker]
The run_id and the associated process of the run created by the SuperLink,
or `None` if it fails.
run_id : Optional[int]
The run_id of the run created by the SuperLink, or `None` if it fails.
"""
11 changes: 4 additions & 7 deletions src/py/flwr/superexec/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from flwr.server.superlink.linkstate import LinkStateFactory
from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes

from .executor import Executor, RunTracker
from .executor import Executor


def _user_config_to_str(user_config: UserConfig) -> str:
Expand Down Expand Up @@ -125,7 +125,7 @@ def start_run(
fab_file: bytes,
override_config: UserConfig,
federation_config: UserConfig,
) -> Optional[RunTracker]:
) -> Optional[int]:
"""Start run using the Flower Simulation Engine."""
if self.num_supernodes is None:
raise ValueError(
Expand Down Expand Up @@ -199,17 +199,14 @@ def start_run(
command.extend(["--run-config", f"{override_config_str}"])

# Start Simulation
proc = subprocess.Popen( # pylint: disable=consider-using-with
_ = subprocess.Popen( # pylint: disable=consider-using-with
command,
text=True,
)

log(INFO, "Started run %s", str(run_id))

return RunTracker(
run_id=run_id,
proc=proc,
)
return run_id

# pylint: disable-next=broad-except
except Exception as e:
Expand Down

0 comments on commit 7fc4261

Please sign in to comment.