diff --git a/protos/CMakeLists.txt b/protos/CMakeLists.txt index 65a403be..0206c61d 100644 --- a/protos/CMakeLists.txt +++ b/protos/CMakeLists.txt @@ -2,10 +2,13 @@ farm_ng_add_protobufs(farm_ng_amiga_proto_defs NAMESPACE farm_ng_amiga INCLUDE_DIRS ${Sophus_PROTO_DIR} + ${farm_ng_core_PROTO_DIR} PROTO_FILES - farm_ng/oak/oak.proto farm_ng/canbus/canbus.proto + farm_ng/oak/oak.proto + farm_ng/service/service.proto DEPENDENCIES Sophus::sophus_linalg_proto_defs Sophus::sophus_lie_proto_defs + farm_ng_core::farm_ng_core_proto_defs ) diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index c70afda5..df823834 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - - syntax = "proto3"; package farm_ng.canbus.proto; @@ -22,34 +20,14 @@ service CanbusService { rpc streamCanbusMessages(StreamCanbusRequest) returns (stream StreamCanbusReply) {} rpc sendCanbusMessage(stream SendCanbusMessageRequest) - returns (SendCanbusMessageReply) {} - rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} - rpc startService(StartServiceRequest) returns (StartServiceResult) {} - rpc stopService(StopServiceRequest) returns (StopServiceResult) {} - rpc pauseService(PauseServiceRequest) returns (PauseServiceResult) {} -} - -enum ReplyStatus { - OK = 0; - FAILED = 1; -} - -enum CanbusServiceState { - UNKNOWN = 0; - STOPPED = 1; - RUNNING = 2; - IDLE = 3; - UNAVAILABLE = 4; - ERROR = 5; + returns (stream SendCanbusMessageReply) {} } message StreamCanbusRequest { - string message = 1; } message StreamCanbusReply { - ReplyStatus status = 1; - RawCanbusMessages messages = 2; + RawCanbusMessages messages = 1; } message SendCanbusMessageRequest { @@ -57,7 +35,7 @@ message SendCanbusMessageRequest { } message SendCanbusMessageReply { - ReplyStatus status = 1; + bool success = 1; } message RawCanbusMessage { @@ -73,40 +51,3 @@ message RawCanbusMessage { message RawCanbusMessages { repeated RawCanbusMessage messages = 1; } - -message StartServiceRequest { - string message = 1; -} - -message StopServiceRequest { - string message = 1; -} - -message PauseServiceRequest { - string message = 1; -} - -message StartServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message StopServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message PauseServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message GetServiceStateRequest { - string message = 1; -} - -message GetServiceStateResult { - string state_name = 1; - CanbusServiceState state = 2; - ReplyStatus status = 3; -} diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index 527aca46..6b23694c 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -12,79 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. - - syntax = "proto3"; +import "farm_ng/service/service.proto"; + package farm_ng.oak.proto; service OakService { rpc cameraControl(CameraControlRequest) returns (CameraControlReply) {} rpc streamFrames(StreamFramesRequest) returns (stream StreamFramesReply) {} - rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} - rpc startService(StartServiceRequest) returns (StartServiceResult) {} - rpc stopService(StopServiceRequest) returns (StopServiceResult) {} - rpc pauseService(PauseServiceRequest) returns (PauseServiceResult) {} - rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationResult) {} -} - -enum ReplyStatus { - OK = 0; - FAILED = 1; -} - -enum OakServiceState { - UNKNOWN = 0; - STOPPED = 1; - RUNNING = 2; - IDLE = 3; - UNAVAILABLE = 4; + rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationReply) {} } message GetCalibrationRequest { - string message = 1; } -message GetCalibrationResult { +message GetCalibrationReply { OakCalibration calibration = 1; - ReplyStatus status = 2; -} - -message StopServiceRequest { - string message = 1; -} - -message StopServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message StartServiceRequest { - string message = 1; -} - -message StartServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message PauseServiceRequest { - string message = 1; -} - -message PauseServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message GetServiceStateRequest { - string message = 1; -} - -message GetServiceStateResult { - string state_name = 1; - OakServiceState state = 2; - ReplyStatus status = 3; } message CameraControlRequest { @@ -92,11 +36,10 @@ message CameraControlRequest { CameraSettings rgb_settings = 2; } - message CameraControlReply { - ReplyStatus status = 1; - CameraSettings stereo_settings = 2; - CameraSettings rgb_settings = 3; + bool success = 1; + CameraSettings stereo_settings = 2; + CameraSettings rgb_settings = 3; } message StreamFramesRequest { @@ -104,8 +47,7 @@ message StreamFramesRequest { } message StreamFramesReply { - ReplyStatus status = 1; - OakSyncFrame frame = 2; + OakSyncFrame frame = 1; } message Vec3F32 { @@ -124,7 +66,7 @@ message CameraSettings { message OakImageMeta { int64 category = 1; // DepthAI catetory int64 instance_num = 2; //DepthAI instance number - int64 sequence_num = 3; // for synchronization between left, right and stereo + int64 sequence_num = 3; // for synchronization between left, right and stereo double timestamp = 4; // seconds, synchronized with host monotonic double timestamp_device = 5; // seconds, device monotonic clock CameraSettings settings = 6; @@ -216,7 +158,6 @@ message Extrinsics { Vector3d translation = 4; } - message CameraData { uint32 camera_number = 1; int32 camera_type = 2; diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto new file mode 100644 index 00000000..776e0cd5 --- /dev/null +++ b/protos/farm_ng/service/service.proto @@ -0,0 +1,46 @@ +// Copyright (c) farm-ng, inc. +// +// Licensed under the Amiga Development Kit License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "farm_ng/core/timestamp.proto"; + +package farm_ng.service.proto; + +service Service { + rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateReply) {} +} + +enum ServiceState { + // Service state is not known + UNKNOWN = 0; + // Running, and actively serving data + RUNNING = 1; + // Idle, waiting for clients + IDLE = 2; + // Service isn't running at all + UNAVAILABLE = 3; + // Service is in some error state + ERROR = 4; +} + +message GetServiceStateRequest { +} + +message GetServiceStateReply { + ServiceState state = 1; + uint32 pid = 2; // process id + double uptime = 3; + repeated farm_ng.core.proto.Timestamp timestamps = 4; +} diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index f8443e37..b50a007a 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -11,115 +11,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import logging -from dataclasses import dataclass -import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc - +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceClient logging.basicConfig(level=logging.INFO) -@dataclass -class CanbusClientConfig: - """Canbus client configuration. - - Attributes: - port (int): the port to connect to the server. - address (str): the address to connect to the server. - """ - - port: int # the port of the server address - address: str = "localhost" # the address name of the server - - -class CanbusServiceState: - """Canbus service state.""" - - def __init__(self, proto: canbus_pb2.CanbusServiceState = None) -> None: - self._proto = canbus_pb2.CanbusServiceState.UNAVAILABLE - if proto is not None: - self._proto = proto - - @property - def value(self) -> int: - return self._proto - - @property - def name(self) -> str: - return canbus_pb2.CanbusServiceState.DESCRIPTOR.values[self.value].name - - def __repr__(self) -> str: - return f"{self.__class__.__name__}: ({self.value}, {self.name})" - +class CanbusClient(ServiceClient): + """Amiga canbus client. -class CanbusClient: - def __init__(self, config: CanbusClientConfig) -> None: - self.config = config + Client class to connect with the Amiga brain canbus service. + Inherits from ServiceClient. - self.logger = logging.getLogger(self.__class__.__name__) + Args: + config (ClientConfig): the grpc configuration data structure. + """ + def __init__(self, config: ClientConfig) -> None: + super().__init__(config) # create a async connection with the server - self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = canbus_pb2_grpc.CanbusServiceStub(self.channel) - self._state = CanbusServiceState() - - @property - def state(self) -> CanbusServiceState: - return self._state - - @property - def server_address(self) -> str: - """Returns the composed address and port.""" - return f"{self.config.address}:{self.config.port}" - - async def poll_service_state(self) -> None: - while True: - try: - self._state = await self.get_state() - await asyncio.sleep(0.1) - except asyncio.CancelledError: - self.logger.info("Got Cancelled Error") - break - - async def get_state(self) -> CanbusServiceState: - state: CanbusServiceState - try: - response: canbus_pb2.GetServiceStateResponse = await self.stub.getServiceState( - canbus_pb2.GetServiceStateRequest() - ) - state = CanbusServiceState(response.state) - except grpc.RpcError: - state = CanbusServiceState() - self.logger.debug("CanbusServiceStub: port -> %i state is: %s", self.config.port, state.name) - return state - - async def connect_to_service(self) -> None: - """Starts the canbus streaming. - - The service state will go to `RUNNING`. - """ - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.startService(canbus_pb2.StartServiceRequest()) - - async def disconnect_from_service(self) -> None: - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.stopService(canbus_pb2.StopServiceRequest()) - - async def pause_service(self) -> None: - """Pauses the canbus streaming. - - The service state will go from `RUNNING` to `IDLE`. - """ - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.pauseService(canbus_pb2.PauseServiceRequest()) + def stream(self): + """Return the async streaming object.""" + return self.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index 16603b11..4acb1cbd 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -14,13 +14,13 @@ import asyncio import logging import time -from dataclasses import dataclass -import grpc from farm_ng.oak import oak_pb2 from farm_ng.oak import oak_pb2_grpc +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceClient -__all__ = ["OakCameraClientConfig", "OakCameraClient", "OakCameraServiceState"] +__all__ = ["OakCameraClient"] logging.basicConfig(level=logging.INFO) @@ -61,86 +61,27 @@ def next_call_wait(self): return self.period - (time.monotonic() - self.last_call) -@dataclass -class OakCameraClientConfig: - """Camera client configuration. - - Attributes: - port (int): the port to connect to the server. - address (str): the address to connect to the server. - """ - - port: int # the port of the server address - address: str = "localhost" # the address name of the server - - -class OakCameraServiceState: - """Camera service state. - - Possible state values: - - UNKNOWN: undefined state. - - RUNNING: the service is up AND streaming. - - IDLE: the service is up AND NOT streaming. - - UNAVAILABLE: the service is not available. - - Args: - proto (oak_pb2.OakServiceState): protobuf message containing the camera state. - """ - - def __init__(self, proto: oak_pb2.OakServiceState = None) -> None: - self._proto = oak_pb2.OakServiceState.UNAVAILABLE - if proto is not None: - self._proto = proto - - @property - def value(self) -> int: - """Returns the state enum value.""" - return self._proto - - @property - def name(self) -> str: - """Return the state name.""" - return oak_pb2.OakServiceState.DESCRIPTOR.values[self.value].name - - def __repr__(self) -> str: - return f"{self.__class__.__name__}: ({self.value}, {self.name})" - - -class OakCameraClient: +class OakCameraClient(ServiceClient): """Oak-D camera client. Client class to connect with the Amiga brain camera services. - Internally implements an `asyncio` gRPC channel. + Inherits from ServiceClient. Args: - config (OakCameraClientConfig): the camera configuration data structure. + config (ClientConfig): the grpc configuration data structure. """ - def __init__(self, config: OakCameraClientConfig) -> None: - self.config = config - - self.logger = logging.getLogger(self.__class__.__name__) + def __init__(self, config: ClientConfig) -> None: + super().__init__(config) - # create a async connection with the server - self.channel = grpc.aio.insecure_channel(self.server_address) + # create an async connection with the server self.stub = oak_pb2_grpc.OakServiceStub(self.channel) - self._state = OakCameraServiceState() - self._mono_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self._rgb_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self.needs_update = False - @property - def state(self) -> OakCameraServiceState: - return self._state - - @property - def server_address(self) -> str: - """Returns the composed address and port.""" - return f"{self.config.address}:{self.config.port}" - @property def rgb_settings(self) -> str: return self._rgb_camera_settings @@ -150,54 +91,10 @@ def mono_settings(self) -> str: return self._mono_camera_settings def settings_reply(self, reply) -> None: - if reply.status == oak_pb2.ReplyStatus.OK: + if reply.success: self._mono_camera_settings.CopyFrom(reply.stereo_settings) self._rgb_camera_settings.CopyFrom(reply.rgb_settings) - async def poll_service_state(self) -> None: - while True: - try: - self._state = await self.get_state() - await asyncio.sleep(0.1) - except asyncio.CancelledError: - self.logger.info("Got CancellededError") - break - - async def get_state(self) -> OakCameraServiceState: - """Async call to retrieve the state of the connected service.""" - state: OakCameraServiceState - try: - response: oak_pb2.GetServiceStateResponse = await self.stub.getServiceState( - oak_pb2.GetServiceStateRequest() - ) - state = OakCameraServiceState(response.state) - except grpc.RpcError: - state = OakCameraServiceState() - self.logger.debug("OakServiceStub: port -> %i state is: %s", self.config.port, state.name) - return state - - async def connect_to_service(self) -> None: - """Start the camera streaming. - - The service state will go from `IDLE` to `RUNNING`. - """ - state: OakCameraServiceState = await self.get_state() - if state.value == oak_pb2.OakServiceState.UNAVAILABLE: - return - reply = await self.stub.cameraControl(oak_pb2.CameraControlRequest()) - self.settings_reply(reply) - await self.stub.startService(oak_pb2.StartServiceRequest()) - - async def pause_service(self) -> None: - """Pauses the camera streaming. - - The service state will go from `RUNNING` to `IDLE`. - """ - state: OakCameraServiceState = await self.get_state() - if state.value == oak_pb2.OakServiceState.UNAVAILABLE: - return - await self.stub.pauseService(oak_pb2.PauseServiceRequest()) - async def send_settings(self) -> oak_pb2.CameraControlReply: request = oak_pb2.CameraControlRequest() request.stereo_settings.CopyFrom(self._mono_camera_settings) @@ -205,6 +102,14 @@ async def send_settings(self) -> oak_pb2.CameraControlReply: self.needs_update = False return await self.stub.cameraControl(request) + async def get_calibration(self) -> oak_pb2.GetCalibrationReply: + """Return the oak calibration as oak_pb2.GetCalibrationReply. + + Args: + request: proto defined request for oak calibration (oak_pb2.GetCalibrationRequest) + """ + return await self.stub.getCalibration(oak_pb2.GetCalibrationRequest()) + @RateLimiter(period=1) def update_rgb_settings(self, rgb_settings): self.needs_update = True diff --git a/py/farm_ng/service/service_client.py b/py/farm_ng/service/service_client.py new file mode 100644 index 00000000..9044799b --- /dev/null +++ b/py/farm_ng/service/service_client.py @@ -0,0 +1,104 @@ +# Copyright (c) farm-ng, inc. +# +# Licensed under the Amiga Development Kit License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from dataclasses import dataclass + +import grpc +from farm_ng.service import service_pb2 +from farm_ng.service import service_pb2_grpc + + +@dataclass +class ClientConfig: + """Client configuration. + + Attributes: + port (int): the port to connect to the server. + address (str): the address to connect to the server. + """ + + port: int # the port of the server address + address: str = "localhost" # the address name of the server + + +class ServiceState: + """Generic service state. + + Possible state values: + - UNKNOWN: undefined state. + - RUNNING: the service is up AND streaming. + - IDLE: the service is up AND NOT streaming. + - UNAVAILABLE: the service is not available. + - ERROR: the service is an error state. + + Args: + proto (service_pb2.ServiceState): protobuf message containing the service state. + """ + + def __init__(self, proto: service_pb2.ServiceState = None) -> None: + self._proto = service_pb2.ServiceState.UNAVAILABLE + if proto is not None: + self._proto = proto + + @property + def value(self) -> int: + """Returns the state enum value.""" + return self._proto + + @property + def name(self) -> str: + """Return the state name.""" + return service_pb2.ServiceState.DESCRIPTOR.values[self.value].name + + def __repr__(self) -> str: + return f"{self.__class__.__name__}: ({self.value}, {self.name})" + + +class ServiceClient: + """Generic client. + + Generic client class to connect with the Amiga brain services. + Internally implements an `asyncio` gRPC channel. + Designed to be imported by service specific clients. + + Args: + config (ClientConfig): the grpc configuration data structure. + """ + + def __init__(self, config: ClientConfig) -> None: + print('config in ServiceClient', config) + self.config = config + + self.logger = logging.getLogger(self.__class__.__name__) + + # create an async connection with the server + self.channel = grpc.aio.insecure_channel(self.server_address) + self.state_stub = service_pb2_grpc.ServiceStub(self.channel) + + @property + def server_address(self) -> str: + """Returns the composed address and port.""" + return f"{self.config.address}:{self.config.port}" + + async def get_state(self) -> ServiceState: + state: ServiceState + try: + response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( + service_pb2.GetServiceStateRequest() + ) + state = ServiceState(response.state) + except grpc.RpcError: + state = ServiceState() + self.logger.debug(f" {self.__class__.__name__} on port: %s state is: %s", self.config.port, state.name) + return state diff --git a/py/tests/test_canbus.py b/py/tests/test_canbus.py index 54d97126..73aadad8 100644 --- a/py/tests/test_canbus.py +++ b/py/tests/test_canbus.py @@ -12,29 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.canbus_client import CanbusClient -from farm_ng.canbus.canbus_client import CanbusClientConfig -from farm_ng.canbus.canbus_client import CanbusServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceState @pytest.fixture(name="config") -def fixture_config() -> CanbusClientConfig: - return CanbusClientConfig(port=50051) +def fixture_config() -> ClientConfig: + return ClientConfig(port=50051) class TestCanbusClient: - def test_smoke_config(self, config: CanbusClientConfig) -> None: + def test_smoke_config(self, config: ClientConfig) -> None: assert config.port == 50051 assert config.address == "localhost" - def test_smoke(self, config: CanbusClientConfig) -> None: + def test_smoke(self, config: ClientConfig) -> None: client = CanbusClient(config) assert client is not None assert client.server_address == "localhost:50051" @pytest.mark.asyncio - async def test_state(self, config: CanbusClientConfig) -> None: + async def test_state(self, config: ClientConfig) -> None: client = CanbusClient(config) - state: CanbusServiceState = await client.get_state() - assert state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE + state: ServiceState = await client.get_state() + assert state.value == service_pb2.ServiceState.UNAVAILABLE diff --git a/py/tests/test_oak.py b/py/tests/test_oak.py index 4b520d5a..8354ed43 100644 --- a/py/tests/test_oak.py +++ b/py/tests/test_oak.py @@ -12,29 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig -from farm_ng.oak.camera_client import OakCameraServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceState @pytest.fixture(name="config") -def fixture_config() -> OakCameraClientConfig: - return OakCameraClientConfig(port=50051) +def fixture_config() -> ClientConfig: + return ClientConfig(port=50051) class TestOakClient: - def test_smoke_config(self, config: OakCameraClientConfig) -> None: + def test_smoke_config(self, config: ClientConfig) -> None: assert config.port == 50051 assert config.address == "localhost" - def test_smoke(self, config: OakCameraClientConfig) -> None: + def test_smoke(self, config: ClientConfig) -> None: client = OakCameraClient(config) assert client is not None assert client.server_address == "localhost:50051" @pytest.mark.asyncio - async def test_state(self, config: OakCameraClientConfig) -> None: + async def test_state(self, config: ClientConfig) -> None: client = OakCameraClient(config) - state: OakCameraServiceState = await client.get_state() - assert state.value == oak_pb2.OakServiceState.UNAVAILABLE + state: ServiceState = await client.get_state() + assert state.value == service_pb2.ServiceState.UNAVAILABLE diff --git a/setup.cfg b/setup.cfg index 34f3052b..15de34e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ python_requires = >=3.6 setup_requires = wheel sophus + farm-ng-core install_requires = protobuf grpcio @@ -43,6 +44,7 @@ packages = farm_ng farm_ng.canbus farm_ng.oak + farm_ng.service [options.extras_require] dev = @@ -63,14 +65,11 @@ files = py/sophus, py/tests, py/examples ignore_missing_imports = True [options.package_data] -farm_ng.controller = - *.proto - farm_ng.oak = *.proto farm_ng.canbus = *.proto -farm_ng.state_estimator = +farm_ng.service = *.proto