diff --git a/src/py/flwr/superexec/deployment.py b/src/py/flwr/superexec/deployment.py index f13cd2f8ea20..dce25a0f3e69 100644 --- a/src/py/flwr/superexec/deployment.py +++ b/src/py/flwr/superexec/deployment.py @@ -28,7 +28,7 @@ from flwr.server.superlink.ffs.ffs_factory import FfsFactory from flwr.server.superlink.linkstate import LinkState, LinkStateFactory -from .executor import Executor, RunTracker +from .executor import Executor class DeploymentEngine(Executor): @@ -141,7 +141,7 @@ def start_run( fab_file: bytes, override_config: UserConfig, federation_config: UserConfig, - ) -> Optional[RunTracker]: + ) -> Optional[int]: """Start run using the Flower Deployment Engine.""" try: @@ -151,7 +151,7 @@ def start_run( ) log(INFO, "Created run %s", str(run_id)) - return None + return run_id # pylint: disable-next=broad-except except Exception as e: log(ERROR, "Could not start run: %s", str(e)) diff --git a/src/py/flwr/superexec/exec_servicer.py b/src/py/flwr/superexec/exec_servicer.py index 14c1a3548047..a8d11b51f4d5 100644 --- a/src/py/flwr/superexec/exec_servicer.py +++ b/src/py/flwr/superexec/exec_servicer.py @@ -15,10 +15,6 @@ """SuperExec API servicer.""" -import select -import sys -import threading -import time from collections.abc import Generator from logging import ERROR, INFO from typing import Any @@ -37,7 +33,7 @@ from flwr.server.superlink.ffs.ffs_factory import FfsFactory from flwr.server.superlink.linkstate import LinkStateFactory -from .executor import Executor, RunTracker +from .executor import Executor SELECT_TIMEOUT = 1 # Timeout for selecting ready-to-read file descriptors (in seconds) @@ -55,7 +51,6 @@ def __init__( self.ffs_factory = ffs_factory self.executor = executor self.executor.initialize(linkstate_factory, ffs_factory) - self.runs: dict[int, RunTracker] = {} def StartRun( self, request: StartRunRequest, context: grpc.ServicerContext @@ -63,25 +58,17 @@ def StartRun( """Create run ID.""" log(INFO, "ExecServicer.StartRun") - run = self.executor.start_run( + run_id = self.executor.start_run( request.fab.content, user_config_from_proto(request.override_config), user_config_from_proto(request.federation_config), ) - if run is None: + if run_id is None: log(ERROR, "Executor failed to start run") return StartRunResponse() - self.runs[run.run_id] = run - - # Start a background thread to capture the log output - capture_thread = threading.Thread( - target=_capture_logs, args=(run,), daemon=True - ) - capture_thread.start() - - return StartRunResponse(run_id=run.run_id) + return StartRunResponse(run_id=run_id) def StreamLogs( # pylint: disable=C0103 self, request: StreamLogsRequest, context: grpc.ServicerContext @@ -89,58 +76,4 @@ def StreamLogs( # pylint: disable=C0103 """Get logs.""" log(INFO, "ExecServicer.StreamLogs") - # Exit if `run_id` not found - if request.run_id not in self.runs: - context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found") - - last_sent_index = 0 - while context.is_active(): - # Yield n'th row of logs, if n'th row < len(logs) - logs = self.runs[request.run_id].logs - for i in range(last_sent_index, len(logs)): - yield StreamLogsResponse(log_output=logs[i]) - last_sent_index = len(logs) - - # Wait for and continue to yield more log responses only if the - # run isn't completed yet. If the run is finished, the entire log - # is returned at this point and the server ends the stream. - if self.runs[request.run_id].proc.poll() is not None: - log(INFO, "All logs for run ID `%s` returned", request.run_id) - context.set_code(grpc.StatusCode.OK) - context.cancel() - - time.sleep(1.0) # Sleep briefly to avoid busy waiting - - -def _capture_logs( - run: RunTracker, -) -> None: - while True: - # Explicitly check if Popen.poll() is None. Required for `pytest`. - if run.proc.poll() is None: - # Select streams only when ready to read - ready_to_read, _, _ = select.select( - [run.proc.stdout, run.proc.stderr], - [], - [], - SELECT_TIMEOUT, - ) - # Read from std* and append to RunTracker.logs - for stream in ready_to_read: - # Flush stdout to view output in real time - readline = stream.readline() - sys.stdout.write(readline) - sys.stdout.flush() - # Append to logs - line = readline.rstrip() - if line: - run.logs.append(f"{line}") - - # Close std* to prevent blocking - elif run.proc.poll() is not None: - log(INFO, "Subprocess finished, exiting log capture") - if run.proc.stdout: - run.proc.stdout.close() - if run.proc.stderr: - run.proc.stderr.close() - break + yield StreamLogsResponse() diff --git a/src/py/flwr/superexec/exec_servicer_test.py b/src/py/flwr/superexec/exec_servicer_test.py index 6044895de3cf..3b50200d22f2 100644 --- a/src/py/flwr/superexec/exec_servicer_test.py +++ b/src/py/flwr/superexec/exec_servicer_test.py @@ -20,7 +20,7 @@ from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611 -from .exec_servicer import ExecServicer, _capture_logs +from .exec_servicer import ExecServicer def test_start_run() -> None: @@ -36,7 +36,7 @@ def test_start_run() -> None: run_res.proc = proc executor = MagicMock() - executor.start_run = lambda _, __, ___: run_res + executor.start_run = lambda _, __, ___: run_res.run_id context_mock = MagicMock() @@ -48,22 +48,4 @@ def test_start_run() -> None: # Execute response = servicer.StartRun(request, context_mock) - assert response.run_id == 10 - - -def test_capture_logs() -> None: - """Test capture_logs function.""" - run_res = Mock() - run_res.logs = [] - with subprocess.Popen( - ["echo", "success"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) as proc: - run_res.proc = proc - _capture_logs(run_res) - - assert len(run_res.logs) == 1 - assert run_res.logs[0] == "success" diff --git a/src/py/flwr/superexec/executor.py b/src/py/flwr/superexec/executor.py index a36e1dec0fd2..fd87b0d742be 100644 --- a/src/py/flwr/superexec/executor.py +++ b/src/py/flwr/superexec/executor.py @@ -72,7 +72,7 @@ def start_run( fab_file: bytes, override_config: UserConfig, federation_config: UserConfig, - ) -> Optional[RunTracker]: + ) -> Optional[int]: """Start a run using the given Flower FAB ID and version. This method creates a new run on the SuperLink, returns its run_id @@ -89,7 +89,6 @@ def start_run( Returns ------- - run_id : Optional[RunTracker] - The run_id and the associated process of the run created by the SuperLink, - or `None` if it fails. + run_id : Optional[int] + The run_id of the run created by the SuperLink, or `None` if it fails. """ diff --git a/src/py/flwr/superexec/simulation.py b/src/py/flwr/superexec/simulation.py index 83ea0d0681a1..3941b0c98bc6 100644 --- a/src/py/flwr/superexec/simulation.py +++ b/src/py/flwr/superexec/simulation.py @@ -33,7 +33,7 @@ from flwr.server.superlink.linkstate import LinkStateFactory from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes -from .executor import Executor, RunTracker +from .executor import Executor def _user_config_to_str(user_config: UserConfig) -> str: @@ -125,7 +125,7 @@ def start_run( fab_file: bytes, override_config: UserConfig, federation_config: UserConfig, - ) -> Optional[RunTracker]: + ) -> Optional[int]: """Start run using the Flower Simulation Engine.""" if self.num_supernodes is None: raise ValueError( @@ -199,17 +199,14 @@ def start_run( command.extend(["--run-config", f"{override_config_str}"]) # Start Simulation - proc = subprocess.Popen( # pylint: disable=consider-using-with + _ = subprocess.Popen( # pylint: disable=consider-using-with command, text=True, ) log(INFO, "Started run %s", str(run_id)) - return RunTracker( - run_id=run_id, - proc=proc, - ) + return run_id # pylint: disable-next=broad-except except Exception as e: