Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add StopRun to ExecServicer #4634

Merged
merged 14 commits into from
Dec 5, 2024
5 changes: 5 additions & 0 deletions src/proto/flwr/proto/exec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ service Exec {
// Start run upon request
rpc StartRun(StartRunRequest) returns (StartRunResponse) {}

// Stop run upon request
rpc StopRun(StopRunRequest) returns (StopRunResponse) {}

// Start log stream upon request
rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {}

Expand All @@ -52,3 +55,5 @@ message ListRunsResponse {
map<uint64, Run> run_dict = 1;
string now = 2;
}
message StopRunRequest { uint64 run_id = 1; }
message StopRunResponse { bool success = 1; }
10 changes: 7 additions & 3 deletions src/py/flwr/proto/exec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions src/py/flwr/proto/exec_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,25 @@ class ListRunsResponse(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["now",b"now","run_dict",b"run_dict"]) -> None: ...
global___ListRunsResponse = ListRunsResponse

class StopRunRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
def __init__(self,
*,
run_id: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___StopRunRequest = StopRunRequest

class StopRunResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
SUCCESS_FIELD_NUMBER: builtins.int
success: builtins.bool
def __init__(self,
*,
success: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["success",b"success"]) -> None: ...
global___StopRunResponse = StopRunResponse
34 changes: 34 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString,
)
self.StopRun = channel.unary_unary(
'/flwr.proto.Exec/StopRun',
request_serializer=flwr_dot_proto_dot_exec__pb2.StopRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StopRunResponse.FromString,
)
self.StreamLogs = channel.unary_stream(
'/flwr.proto.Exec/StreamLogs',
request_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString,
Expand All @@ -41,6 +46,13 @@ def StartRun(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StopRun(self, request, context):
"""Stop run upon request
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StreamLogs(self, request, context):
"""Start log stream upon request
"""
Expand All @@ -63,6 +75,11 @@ def add_ExecServicer_to_server(servicer, server):
request_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.SerializeToString,
),
'StopRun': grpc.unary_unary_rpc_method_handler(
servicer.StopRun,
request_deserializer=flwr_dot_proto_dot_exec__pb2.StopRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StopRunResponse.SerializeToString,
),
'StreamLogs': grpc.unary_stream_rpc_method_handler(
servicer.StreamLogs,
request_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.FromString,
Expand Down Expand Up @@ -100,6 +117,23 @@ def StartRun(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def StopRun(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/flwr.proto.Exec/StopRun',
flwr_dot_proto_dot_exec__pb2.StopRunRequest.SerializeToString,
flwr_dot_proto_dot_exec__pb2.StopRunResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def StreamLogs(request,
target,
Expand Down
13 changes: 13 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class ExecStub:
flwr.proto.exec_pb2.StartRunResponse]
"""Start run upon request"""

StopRun: grpc.UnaryUnaryMultiCallable[
flwr.proto.exec_pb2.StopRunRequest,
flwr.proto.exec_pb2.StopRunResponse]
"""Stop run upon request"""

StreamLogs: grpc.UnaryStreamMultiCallable[
flwr.proto.exec_pb2.StreamLogsRequest,
flwr.proto.exec_pb2.StreamLogsResponse]
Expand All @@ -34,6 +39,14 @@ class ExecServicer(metaclass=abc.ABCMeta):
"""Start run upon request"""
pass

@abc.abstractmethod
def StopRun(self,
request: flwr.proto.exec_pb2.StopRunRequest,
context: grpc.ServicerContext,
) -> flwr.proto.exec_pb2.StopRunResponse:
"""Stop run upon request"""
pass

@abc.abstractmethod
def StreamLogs(self,
request: flwr.proto.exec_pb2.StreamLogsRequest,
Expand Down
29 changes: 28 additions & 1 deletion src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@
import grpc

from flwr.common import now
from flwr.common.constant import LOG_STREAM_INTERVAL, Status
from flwr.common.constant import LOG_STREAM_INTERVAL, Status, SubStatus
from flwr.common.logger import log
from flwr.common.serde import (
configs_record_from_proto,
run_to_proto,
user_config_from_proto,
)
from flwr.common.typing import RunStatus
from flwr.proto import exec_pb2_grpc # pylint: disable=E0611
from flwr.proto.exec_pb2 import ( # pylint: disable=E0611
ListRunsRequest,
ListRunsResponse,
StartRunRequest,
StartRunResponse,
StopRunRequest,
StopRunResponse,
StreamLogsRequest,
StreamLogsResponse,
)
Expand Down Expand Up @@ -126,6 +129,30 @@ def ListRuns(
# Handle `flwr ls --run-id <run_id>`
return _create_list_runs_response({request.run_id}, state)

def StopRun(
self, request: StopRunRequest, context: grpc.ServicerContext
) -> StopRunResponse:
"""Stop a given run ID."""
log(INFO, "ExecServicer.StopRun")
state = self.linkstate_factory.state()

# Exit if `run_id` not found
if not state.get_run(request.run_id):
context.abort(grpc.StatusCode.NOT_FOUND, "Run ID not found")

run_status = state.get_run_status({request.run_id})[request.run_id]

if run_status.status == Status.FINISHED:
log(ERROR, "Run ID `%s` is already finished", request.run_id)
return StopRunResponse(success=False)
chongshenng marked this conversation as resolved.
Show resolved Hide resolved

return StopRunResponse(
success=state.update_run_status(
run_id=request.run_id,
new_status=RunStatus(Status.FINISHED, SubStatus.STOPPED, ""),
)
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
)


def _create_list_runs_response(run_ids: set[int], state: LinkState) -> ListRunsResponse:
"""Create response for `flwr ls --runs` and `flwr ls --run-id <run_id>`."""
Expand Down
25 changes: 25 additions & 0 deletions src/py/flwr/superexec/exec_servicer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
from unittest.mock import MagicMock, Mock

from flwr.common import ConfigsRecord, now
from flwr.common.constant import Status, SubStatus
from flwr.common.typing import RunStatus
from flwr.proto.exec_pb2 import ( # pylint: disable=E0611
ListRunsRequest,
StartRunRequest,
StopRunRequest,
)
from flwr.server.superlink.ffs.ffs_factory import FfsFactory
from flwr.server.superlink.linkstate import LinkStateFactory
Expand Down Expand Up @@ -104,3 +107,25 @@ def test_list_run_id(self) -> None:
# Assert
self.assertLess(abs(retrieved_timestamp - now().timestamp()), 1e-3)
self.assertEqual(set(response.run_dict.keys()), {run_id})

def test_stop_run(self) -> None:
"""Test StopRun method of ExecServicer."""
# Prepare
run_id = self.state.create_run(
"mock fabid", "mock fabver", "fake hash", {}, ConfigsRecord()
danieljanes marked this conversation as resolved.
Show resolved Hide resolved
)
self.servicer.executor = MagicMock()
expected_run_status = RunStatus(Status.FINISHED, SubStatus.STOPPED, "")
self.servicer.executor.stop_run = lambda input_run_id: (
input_run_id == run_id
) & self.state.update_run_status(input_run_id, new_status=expected_run_status)

# Execute
response = self.servicer.StopRun(StopRunRequest(run_id=run_id), Mock())
run_state = self.state.get_run(run_id)

# Assert
self.assertTrue(response.success)
self.assertIsNotNone(run_state)
if run_state is not None:
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(run_state.status, expected_run_status)