Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Update ServerAppIoServicer RPCs for flwr stop #4629

Merged
merged 16 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/proto/flwr/proto/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ service Control {
// Request to create a new run
rpc CreateRun(CreateRunRequest) returns (CreateRunResponse) {}

// Get the status of a given run
rpc GetRunStatus(GetRunStatusRequest) returns (GetRunStatusResponse) {}

chongshenng marked this conversation as resolved.
Show resolved Hide resolved
// Update the status of a given run
rpc UpdateRunStatus(UpdateRunStatusRequest)
returns (UpdateRunStatusResponse) {}
Expand Down
8 changes: 8 additions & 0 deletions src/proto/flwr/proto/exec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ service Exec {
// Start run upon request
rpc StartRun(StartRunRequest) returns (StartRunResponse) {}

// Stop run upon request
rpc StopRun(StopRunRequest) returns (StopRunResponse) {}

// Start log stream upon request
rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {}

Expand All @@ -52,3 +55,8 @@ message ListRunsResponse {
map<uint64, Run> run_dict = 1;
string now = 2;
}
message StopRunRequest {
uint64 run_id = 1;
Fab fab = 2;
chongshenng marked this conversation as resolved.
Show resolved Hide resolved
}
message StopRunResponse { bool success = 1; }
7 changes: 2 additions & 5 deletions src/proto/flwr/proto/run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ message UpdateRunStatusRequest {
message UpdateRunStatusResponse {}

// GetRunStatus
message GetRunStatusRequest {
Node node = 1;
repeated uint64 run_ids = 2;
}
message GetRunStatusResponse { map<uint64, RunStatus> run_status_dict = 1; }
message GetRunStatusRequest { uint64 run_id = 1; }
message GetRunStatusResponse { RunStatus run_status = 1; }

// Get Federation Options associated with run
message GetFederationOptionsRequest { uint64 run_id = 1; }
Expand Down
3 changes: 3 additions & 0 deletions src/proto/flwr/proto/serverappio.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ service ServerAppIo {
rpc UpdateRunStatus(UpdateRunStatusRequest)
returns (UpdateRunStatusResponse) {}

// Get the status of a given run
rpc GetRunStatus(GetRunStatusRequest) returns (GetRunStatusResponse) {}

// Push ServerApp logs
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
}
Expand Down
4 changes: 2 additions & 2 deletions src/py/flwr/proto/control_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 0 additions & 34 deletions src/py/flwr/proto/control_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ def __init__(self, channel):
request_serializer=flwr_dot_proto_dot_run__pb2.CreateRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_run__pb2.CreateRunResponse.FromString,
)
self.GetRunStatus = channel.unary_unary(
'/flwr.proto.Control/GetRunStatus',
request_serializer=flwr_dot_proto_dot_run__pb2.GetRunStatusRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_run__pb2.GetRunStatusResponse.FromString,
)
self.UpdateRunStatus = channel.unary_unary(
'/flwr.proto.Control/UpdateRunStatus',
request_serializer=flwr_dot_proto_dot_run__pb2.UpdateRunStatusRequest.SerializeToString,
Expand All @@ -41,13 +36,6 @@ def CreateRun(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetRunStatus(self, request, context):
"""Get the status of a given run
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def UpdateRunStatus(self, request, context):
"""Update the status of a given run
"""
Expand All @@ -63,11 +51,6 @@ def add_ControlServicer_to_server(servicer, server):
request_deserializer=flwr_dot_proto_dot_run__pb2.CreateRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_run__pb2.CreateRunResponse.SerializeToString,
),
'GetRunStatus': grpc.unary_unary_rpc_method_handler(
servicer.GetRunStatus,
request_deserializer=flwr_dot_proto_dot_run__pb2.GetRunStatusRequest.FromString,
response_serializer=flwr_dot_proto_dot_run__pb2.GetRunStatusResponse.SerializeToString,
),
'UpdateRunStatus': grpc.unary_unary_rpc_method_handler(
servicer.UpdateRunStatus,
request_deserializer=flwr_dot_proto_dot_run__pb2.UpdateRunStatusRequest.FromString,
Expand Down Expand Up @@ -100,23 +83,6 @@ def CreateRun(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def GetRunStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/flwr.proto.Control/GetRunStatus',
flwr_dot_proto_dot_run__pb2.GetRunStatusRequest.SerializeToString,
flwr_dot_proto_dot_run__pb2.GetRunStatusResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def UpdateRunStatus(request,
target,
Expand Down
13 changes: 0 additions & 13 deletions src/py/flwr/proto/control_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ class ControlStub:
flwr.proto.run_pb2.CreateRunResponse]
"""Request to create a new run"""

GetRunStatus: grpc.UnaryUnaryMultiCallable[
flwr.proto.run_pb2.GetRunStatusRequest,
flwr.proto.run_pb2.GetRunStatusResponse]
"""Get the status of a given run"""

UpdateRunStatus: grpc.UnaryUnaryMultiCallable[
flwr.proto.run_pb2.UpdateRunStatusRequest,
flwr.proto.run_pb2.UpdateRunStatusResponse]
Expand All @@ -33,14 +28,6 @@ class ControlServicer(metaclass=abc.ABCMeta):
"""Request to create a new run"""
pass

@abc.abstractmethod
def GetRunStatus(self,
request: flwr.proto.run_pb2.GetRunStatusRequest,
context: grpc.ServicerContext,
) -> flwr.proto.run_pb2.GetRunStatusResponse:
"""Get the status of a given run"""
pass

@abc.abstractmethod
def UpdateRunStatus(self,
request: flwr.proto.run_pb2.UpdateRunStatusRequest,
Expand Down
10 changes: 7 additions & 3 deletions src/py/flwr/proto/exec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/py/flwr/proto/exec_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,30 @@ class ListRunsResponse(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["now",b"now","run_dict",b"run_dict"]) -> None: ...
global___ListRunsResponse = ListRunsResponse

class StopRunRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
FAB_FIELD_NUMBER: builtins.int
run_id: builtins.int
@property
def fab(self) -> flwr.proto.fab_pb2.Fab: ...
def __init__(self,
*,
run_id: builtins.int = ...,
fab: typing.Optional[flwr.proto.fab_pb2.Fab] = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["fab",b"fab"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["fab",b"fab","run_id",b"run_id"]) -> None: ...
global___StopRunRequest = StopRunRequest

class StopRunResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
SUCCESS_FIELD_NUMBER: builtins.int
success: builtins.bool
def __init__(self,
*,
success: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["success",b"success"]) -> None: ...
global___StopRunResponse = StopRunResponse
34 changes: 34 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString,
)
self.StopRun = channel.unary_unary(
'/flwr.proto.Exec/StopRun',
request_serializer=flwr_dot_proto_dot_exec__pb2.StopRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StopRunResponse.FromString,
)
self.StreamLogs = channel.unary_stream(
'/flwr.proto.Exec/StreamLogs',
request_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString,
Expand All @@ -41,6 +46,13 @@ def StartRun(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StopRun(self, request, context):
"""Stop run upon request
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StreamLogs(self, request, context):
"""Start log stream upon request
"""
Expand All @@ -63,6 +75,11 @@ def add_ExecServicer_to_server(servicer, server):
request_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.SerializeToString,
),
'StopRun': grpc.unary_unary_rpc_method_handler(
servicer.StopRun,
request_deserializer=flwr_dot_proto_dot_exec__pb2.StopRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StopRunResponse.SerializeToString,
),
'StreamLogs': grpc.unary_stream_rpc_method_handler(
servicer.StreamLogs,
request_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.FromString,
Expand Down Expand Up @@ -100,6 +117,23 @@ def StartRun(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def StopRun(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/flwr.proto.Exec/StopRun',
flwr_dot_proto_dot_exec__pb2.StopRunRequest.SerializeToString,
flwr_dot_proto_dot_exec__pb2.StopRunResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def StreamLogs(request,
target,
Expand Down
13 changes: 13 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class ExecStub:
flwr.proto.exec_pb2.StartRunResponse]
"""Start run upon request"""

StopRun: grpc.UnaryUnaryMultiCallable[
flwr.proto.exec_pb2.StopRunRequest,
flwr.proto.exec_pb2.StopRunResponse]
"""Stop run upon request"""

StreamLogs: grpc.UnaryStreamMultiCallable[
flwr.proto.exec_pb2.StreamLogsRequest,
flwr.proto.exec_pb2.StreamLogsResponse]
Expand All @@ -34,6 +39,14 @@ class ExecServicer(metaclass=abc.ABCMeta):
"""Start run upon request"""
pass

@abc.abstractmethod
def StopRun(self,
request: flwr.proto.exec_pb2.StopRunRequest,
context: grpc.ServicerContext,
) -> flwr.proto.exec_pb2.StopRunResponse:
"""Stop run upon request"""
pass

@abc.abstractmethod
def StreamLogs(self,
request: flwr.proto.exec_pb2.StreamLogsRequest,
Expand Down
Loading
Loading