Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add GetFederationOptions to SimulationIo #4448

Merged
merged 12 commits into from
Nov 8, 2024
5 changes: 5 additions & 0 deletions src/proto/flwr/proto/run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package flwr.proto;

import "flwr/proto/fab.proto";
import "flwr/proto/node.proto";
import "flwr/proto/recordset.proto";
import "flwr/proto/transport.proto";

message Run {
Expand Down Expand Up @@ -67,3 +68,7 @@ message GetRunStatusRequest {
repeated uint64 run_ids = 2;
}
message GetRunStatusResponse { map<uint64, RunStatus> run_status_dict = 1; }

// Get Federation Options associated with run
message GetFederationOptionsRequest { uint64 run_id = 1; }
message GetFederationOptionsResponse { ConfigsRecord federation_options = 1; }
4 changes: 4 additions & 0 deletions src/proto/flwr/proto/simulationio.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ service SimulationIo {

// Push ServerApp logs
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}

// Get Federation Options
rpc GetFederationOptions(GetFederationOptionsRequest)
returns (GetFederationOptionsResponse) {}
}

// PullSimulationInputs messages
Expand Down
59 changes: 32 additions & 27 deletions src/py/flwr/proto/run_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions src/py/flwr/proto/run_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ isort:skip_file
import builtins
import flwr.proto.fab_pb2
import flwr.proto.node_pb2
import flwr.proto.recordset_pb2
import flwr.proto.transport_pb2
import google.protobuf.descriptor
import google.protobuf.internal.containers
Expand Down Expand Up @@ -223,3 +224,28 @@ class GetRunStatusResponse(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_status_dict",b"run_status_dict"]) -> None: ...
global___GetRunStatusResponse = GetRunStatusResponse

class GetFederationOptionsRequest(google.protobuf.message.Message):
"""Get Federation Options associated with run"""
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
def __init__(self,
*,
run_id: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___GetFederationOptionsRequest = GetFederationOptionsRequest

class GetFederationOptionsResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FEDERATION_OPTIONS_FIELD_NUMBER: builtins.int
@property
def federation_options(self) -> flwr.proto.recordset_pb2.ConfigsRecord: ...
def __init__(self,
*,
federation_options: typing.Optional[flwr.proto.recordset_pb2.ConfigsRecord] = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["federation_options",b"federation_options"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["federation_options",b"federation_options"]) -> None: ...
global___GetFederationOptionsResponse = GetFederationOptionsResponse
4 changes: 2 additions & 2 deletions src/py/flwr/proto/simulationio_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions src/py/flwr/proto/simulationio_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def __init__(self, channel):
request_serializer=flwr_dot_proto_dot_log__pb2.PushLogsRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_log__pb2.PushLogsResponse.FromString,
)
self.GetFederationOptions = channel.unary_unary(
'/flwr.proto.SimulationIo/GetFederationOptions',
request_serializer=flwr_dot_proto_dot_run__pb2.GetFederationOptionsRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_run__pb2.GetFederationOptionsResponse.FromString,
)


class SimulationIoServicer(object):
Expand Down Expand Up @@ -69,6 +74,13 @@ def PushLogs(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetFederationOptions(self, request, context):
"""Get Federation Options
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_SimulationIoServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand All @@ -92,6 +104,11 @@ def add_SimulationIoServicer_to_server(servicer, server):
request_deserializer=flwr_dot_proto_dot_log__pb2.PushLogsRequest.FromString,
response_serializer=flwr_dot_proto_dot_log__pb2.PushLogsResponse.SerializeToString,
),
'GetFederationOptions': grpc.unary_unary_rpc_method_handler(
servicer.GetFederationOptions,
request_deserializer=flwr_dot_proto_dot_run__pb2.GetFederationOptionsRequest.FromString,
response_serializer=flwr_dot_proto_dot_run__pb2.GetFederationOptionsResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'flwr.proto.SimulationIo', rpc_method_handlers)
Expand Down Expand Up @@ -169,3 +186,20 @@ def PushLogs(request,
flwr_dot_proto_dot_log__pb2.PushLogsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def GetFederationOptions(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/flwr.proto.SimulationIo/GetFederationOptions',
flwr_dot_proto_dot_run__pb2.GetFederationOptionsRequest.SerializeToString,
flwr_dot_proto_dot_run__pb2.GetFederationOptionsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
13 changes: 13 additions & 0 deletions src/py/flwr/proto/simulationio_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class SimulationIoStub:
flwr.proto.log_pb2.PushLogsResponse]
"""Push ServerApp logs"""

GetFederationOptions: grpc.UnaryUnaryMultiCallable[
flwr.proto.run_pb2.GetFederationOptionsRequest,
flwr.proto.run_pb2.GetFederationOptionsResponse]
"""Get Federation Options"""


class SimulationIoServicer(metaclass=abc.ABCMeta):
@abc.abstractmethod
Expand Down Expand Up @@ -64,5 +69,13 @@ class SimulationIoServicer(metaclass=abc.ABCMeta):
"""Push ServerApp logs"""
pass

@abc.abstractmethod
def GetFederationOptions(self,
request: flwr.proto.run_pb2.GetFederationOptionsRequest,
context: grpc.ServicerContext,
) -> flwr.proto.run_pb2.GetFederationOptionsResponse:
"""Get Federation Options"""
pass


def add_SimulationIoServicer_to_server(servicer: SimulationIoServicer, server: grpc.Server) -> None: ...
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from flwr.common.constant import Status
from flwr.common.logger import log
from flwr.common.serde import (
configs_record_to_proto,
context_from_proto,
context_to_proto,
fab_to_proto,
Expand All @@ -36,6 +37,8 @@
PushLogsResponse,
)
from flwr.proto.run_pb2 import ( # pylint: disable=E0611
GetFederationOptionsRequest,
GetFederationOptionsResponse,
UpdateRunStatusRequest,
UpdateRunStatusResponse,
)
Expand Down Expand Up @@ -123,10 +126,28 @@ def PushLogs(
self, request: PushLogsRequest, context: grpc.ServicerContext
) -> PushLogsResponse:
"""Push logs."""
log(DEBUG, "ServerAppIoServicer.PushLogs")
log(DEBUG, "SimultionIoServicer.PushLogs")
state = self.state_factory.state()

# Add logs to LinkState
merged_logs = "".join(request.logs)
state.add_serverapp_log(request.run_id, merged_logs)
return PushLogsResponse()

def GetFederationOptions(
self, request: GetFederationOptionsRequest, context: ServicerContext
) -> GetFederationOptionsResponse:
"""Get Federation Options associated with a run."""
log(DEBUG, "SimultionIoServicer.GetFederationOptions")
state = self.state_factory.state()

federation_options = state.get_federation_options(request.run_id)
if federation_options is None:
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
"Expected federation options to be set, but none available.",
)
return GetFederationOptionsResponse()
return GetFederationOptionsResponse(
federation_options=configs_record_to_proto(federation_options)
)