From 9ccffcb30d0ad5e250c29e88b70dba1f2c9d4846 Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 11 Jan 2023 15:44:41 +0400 Subject: [PATCH 01/23] simplify services api --- protos/farm_ng/canbus/canbus.proto | 31 +--------------------- protos/farm_ng/oak/oak.proto | 30 --------------------- py/farm_ng/canbus/canbus_client.py | 42 ------------------------------ py/farm_ng/oak/camera_client.py | 37 -------------------------- 4 files changed, 1 insertion(+), 139 deletions(-) diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index c70afda5..5066dd24 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -24,9 +24,6 @@ service CanbusService { rpc sendCanbusMessage(stream SendCanbusMessageRequest) returns (SendCanbusMessageReply) {} rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} - rpc startService(StartServiceRequest) returns (StartServiceResult) {} - rpc stopService(StopServiceRequest) returns (StopServiceResult) {} - rpc pauseService(PauseServiceRequest) returns (PauseServiceResult) {} } enum ReplyStatus { @@ -43,6 +40,7 @@ enum CanbusServiceState { ERROR = 5; } +// NOTE: `message` not really necessary message StreamCanbusRequest { string message = 1; } @@ -74,33 +72,6 @@ message RawCanbusMessages { repeated RawCanbusMessage messages = 1; } -message StartServiceRequest { - string message = 1; -} - -message StopServiceRequest { - string message = 1; -} - -message PauseServiceRequest { - string message = 1; -} - -message StartServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message StopServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message PauseServiceResult { - string message = 1; - ReplyStatus status = 2; -} - message GetServiceStateRequest { string message = 1; } diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index 527aca46..5f388e8d 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -22,9 +22,6 @@ service OakService { rpc cameraControl(CameraControlRequest) returns (CameraControlReply) {} rpc streamFrames(StreamFramesRequest) returns (stream StreamFramesReply) {} rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} - rpc startService(StartServiceRequest) returns (StartServiceResult) {} - rpc stopService(StopServiceRequest) returns (StopServiceResult) {} - rpc pauseService(PauseServiceRequest) returns (PauseServiceResult) {} rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationResult) {} } @@ -50,33 +47,6 @@ message GetCalibrationResult { ReplyStatus status = 2; } -message StopServiceRequest { - string message = 1; -} - -message StopServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message StartServiceRequest { - string message = 1; -} - -message StartServiceResult { - string message = 1; - ReplyStatus status = 2; -} - -message PauseServiceRequest { - string message = 1; -} - -message PauseServiceResult { - string message = 1; - ReplyStatus status = 2; -} - message GetServiceStateRequest { string message = 1; } diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index f8443e37..e877b6a3 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import logging from dataclasses import dataclass @@ -66,26 +65,11 @@ def __init__(self, config: CanbusClientConfig) -> None: self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = canbus_pb2_grpc.CanbusServiceStub(self.channel) - self._state = CanbusServiceState() - - @property - def state(self) -> CanbusServiceState: - return self._state - @property def server_address(self) -> str: """Returns the composed address and port.""" return f"{self.config.address}:{self.config.port}" - async def poll_service_state(self) -> None: - while True: - try: - self._state = await self.get_state() - await asyncio.sleep(0.1) - except asyncio.CancelledError: - self.logger.info("Got Cancelled Error") - break - async def get_state(self) -> CanbusServiceState: state: CanbusServiceState try: @@ -97,29 +81,3 @@ async def get_state(self) -> CanbusServiceState: state = CanbusServiceState() self.logger.debug("CanbusServiceStub: port -> %i state is: %s", self.config.port, state.name) return state - - async def connect_to_service(self) -> None: - """Starts the canbus streaming. - - The service state will go to `RUNNING`. - """ - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.startService(canbus_pb2.StartServiceRequest()) - - async def disconnect_from_service(self) -> None: - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.stopService(canbus_pb2.StopServiceRequest()) - - async def pause_service(self) -> None: - """Pauses the canbus streaming. - - The service state will go from `RUNNING` to `IDLE`. - """ - state: CanbusServiceState = await self.get_state() - if state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE: - return - await self.stub.pauseService(canbus_pb2.PauseServiceRequest()) diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index 16603b11..10407ad2 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -125,17 +125,11 @@ def __init__(self, config: OakCameraClientConfig) -> None: self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = oak_pb2_grpc.OakServiceStub(self.channel) - self._state = OakCameraServiceState() - self._mono_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self._rgb_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self.needs_update = False - @property - def state(self) -> OakCameraServiceState: - return self._state - @property def server_address(self) -> str: """Returns the composed address and port.""" @@ -154,15 +148,6 @@ def settings_reply(self, reply) -> None: self._mono_camera_settings.CopyFrom(reply.stereo_settings) self._rgb_camera_settings.CopyFrom(reply.rgb_settings) - async def poll_service_state(self) -> None: - while True: - try: - self._state = await self.get_state() - await asyncio.sleep(0.1) - except asyncio.CancelledError: - self.logger.info("Got CancellededError") - break - async def get_state(self) -> OakCameraServiceState: """Async call to retrieve the state of the connected service.""" state: OakCameraServiceState @@ -176,28 +161,6 @@ async def get_state(self) -> OakCameraServiceState: self.logger.debug("OakServiceStub: port -> %i state is: %s", self.config.port, state.name) return state - async def connect_to_service(self) -> None: - """Start the camera streaming. - - The service state will go from `IDLE` to `RUNNING`. - """ - state: OakCameraServiceState = await self.get_state() - if state.value == oak_pb2.OakServiceState.UNAVAILABLE: - return - reply = await self.stub.cameraControl(oak_pb2.CameraControlRequest()) - self.settings_reply(reply) - await self.stub.startService(oak_pb2.StartServiceRequest()) - - async def pause_service(self) -> None: - """Pauses the camera streaming. - - The service state will go from `RUNNING` to `IDLE`. - """ - state: OakCameraServiceState = await self.get_state() - if state.value == oak_pb2.OakServiceState.UNAVAILABLE: - return - await self.stub.pauseService(oak_pb2.PauseServiceRequest()) - async def send_settings(self) -> oak_pb2.CameraControlReply: request = oak_pb2.CameraControlRequest() request.stereo_settings.CopyFrom(self._mono_camera_settings) From 6f2cb02f8bc8610827a8a7162b7d0a3836f152eb Mon Sep 17 00:00:00 2001 From: edgar Date: Wed, 11 Jan 2023 17:59:43 +0400 Subject: [PATCH 02/23] fix test for farm-ng-core 0.1.4 --- py/tests/test_core.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/py/tests/test_core.py b/py/tests/test_core.py index 04516f03..c87c1de8 100644 --- a/py/tests/test_core.py +++ b/py/tests/test_core.py @@ -19,53 +19,58 @@ from farm_ng.oak import oak_pb2 -@pytest.fixture(name="log_file") +@pytest.fixture(name="log_base") def fixture_log_file(tmpdir) -> Path: - return Path(tmpdir) / "event.log" + return Path(tmpdir) / "event" + + +@pytest.fixture(name="reader_log_file") +def fixture_reader_log_file(tmpdir) -> Path: + return Path(tmpdir) / "event.0000.bin" class TestEventsWriter: - def test_smoke(self, log_file: Path) -> None: - with EventsFileWriter(log_file) as writer: + def test_smoke(self, log_base: Path) -> None: + with EventsFileWriter(log_base) as writer: assert writer.is_open() - def test_write(self, log_file: Path) -> None: - with EventsFileWriter(log_file) as writer: + def test_write(self, log_base: Path) -> None: + with EventsFileWriter(log_base) as writer: frame = oak_pb2.OakFrame() frame.image_data = bytes([1, 2, 3, 4, 5, 6, 7, 8, 9]) writer.write(path="oak/frame", message=frame) class TestEventsReader: - def test_smoke(self, log_file: Path) -> None: + def test_smoke(self, log_base: Path, reader_log_file: Path) -> None: # touch file - with EventsFileWriter(log_file) as _: + with EventsFileWriter(log_base) as _: pass - with EventsFileReader(log_file) as reader: + with EventsFileReader(reader_log_file) as reader: assert reader.is_open() - def test_write_read(self, log_file: Path) -> None: + def test_write_read(self, log_base: Path, reader_log_file: Path) -> None: def _make_frame(i): frame = oak_pb2.OakFrame() frame.image_data = bytes(i * [1, 2, 3, 4, 5, 6, 7, 8, 9]) return frame frames = [_make_frame(i + 1) for i in range(4)] - with EventsFileWriter(log_file) as writer: + with EventsFileWriter(log_base) as writer: writer.write(path="oak0/frame", message=frames[0]) writer.write(path="oak1/frame", message=frames[1]) writer.write(path="oak0/frame", message=frames[2]) writer.write(path="oak0/frame", message=frames[3]) # check time based - with EventsFileReader(log_file) as reader: + with EventsFileReader(reader_log_file) as reader: messages_stream = reader.read_messages() for i, (event, msg) in enumerate(messages_stream): assert msg.image_data == frames[i].image_data assert any(x in event.uri.path for x in ["oak0", "oak1"]) # check frame based - with EventsFileReader(log_file) as reader: + with EventsFileReader(reader_log_file) as reader: events = reader.get_index() oak0, oak1 = sorted([*{x.event.uri.path for x in events}]) oak0_events = [x for x in events if x.event.uri.path == oak0] From b21582a980e60b8264b0d1487c61c45f0501da51 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Thu, 12 Jan 2023 17:17:04 -0500 Subject: [PATCH 03/23] Add stream() wrapper on streamCanbusMessages --- py/farm_ng/canbus/canbus_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index e877b6a3..b57057b9 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -81,3 +81,7 @@ async def get_state(self) -> CanbusServiceState: state = CanbusServiceState() self.logger.debug("CanbusServiceStub: port -> %i state is: %s", self.config.port, state.name) return state + + def stream(self): + """Return the async streaming object.""" + return self.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) From fb7dab3eae3271dc0d69490ddd0121f8caca5fda Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Sat, 14 Jan 2023 13:15:18 -0500 Subject: [PATCH 04/23] Common ServiceState api --- protos/CMakeLists.txt | 3 +- protos/farm_ng/canbus/canbus.proto | 38 ++++----------------- protos/farm_ng/oak/oak.proto | 48 ++++++-------------------- protos/farm_ng/service/service.proto | 47 +++++++++++++++++++++++++ py/farm_ng/canbus/canbus_client.py | 35 +++++-------------- py/farm_ng/oak/camera_client.py | 51 ++++++---------------------- py/farm_ng/service/service.py | 48 ++++++++++++++++++++++++++ setup.cfg | 6 ++-- 8 files changed, 135 insertions(+), 141 deletions(-) create mode 100644 protos/farm_ng/service/service.proto create mode 100644 py/farm_ng/service/service.py diff --git a/protos/CMakeLists.txt b/protos/CMakeLists.txt index 65a403be..bd97369e 100644 --- a/protos/CMakeLists.txt +++ b/protos/CMakeLists.txt @@ -3,8 +3,9 @@ farm_ng_add_protobufs(farm_ng_amiga_proto_defs INCLUDE_DIRS ${Sophus_PROTO_DIR} PROTO_FILES - farm_ng/oak/oak.proto farm_ng/canbus/canbus.proto + farm_ng/oak/oak.proto + farm_ng/service/service.proto DEPENDENCIES Sophus::sophus_linalg_proto_defs Sophus::sophus_lie_proto_defs diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index 5066dd24..26e8a5c3 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. - - syntax = "proto3"; +import "farm_ng/service/service.proto"; + package farm_ng.canbus.proto; service CanbusService { @@ -23,31 +23,15 @@ service CanbusService { returns (stream StreamCanbusReply) {} rpc sendCanbusMessage(stream SendCanbusMessageRequest) returns (SendCanbusMessageReply) {} - rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} + rpc getServiceState(farm_ng.service.proto.GetServiceStateRequest) + returns (farm_ng.service.proto.GetServiceStateReply) {} } -enum ReplyStatus { - OK = 0; - FAILED = 1; -} - -enum CanbusServiceState { - UNKNOWN = 0; - STOPPED = 1; - RUNNING = 2; - IDLE = 3; - UNAVAILABLE = 4; - ERROR = 5; -} - -// NOTE: `message` not really necessary message StreamCanbusRequest { - string message = 1; } message StreamCanbusReply { - ReplyStatus status = 1; - RawCanbusMessages messages = 2; + RawCanbusMessages messages = 1; } message SendCanbusMessageRequest { @@ -55,7 +39,7 @@ message SendCanbusMessageRequest { } message SendCanbusMessageReply { - ReplyStatus status = 1; + farm_ng.service.proto.ReplyStatus status = 1; } message RawCanbusMessage { @@ -71,13 +55,3 @@ message RawCanbusMessage { message RawCanbusMessages { repeated RawCanbusMessage messages = 1; } - -message GetServiceStateRequest { - string message = 1; -} - -message GetServiceStateResult { - string state_name = 1; - CanbusServiceState state = 2; - ReplyStatus status = 3; -} diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index 5f388e8d..62b00810 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -12,49 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. - - syntax = "proto3"; +import "farm_ng/service/service.proto"; + package farm_ng.oak.proto; service OakService { rpc cameraControl(CameraControlRequest) returns (CameraControlReply) {} rpc streamFrames(StreamFramesRequest) returns (stream StreamFramesReply) {} - rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateResult) {} - rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationResult) {} -} - -enum ReplyStatus { - OK = 0; - FAILED = 1; -} - -enum OakServiceState { - UNKNOWN = 0; - STOPPED = 1; - RUNNING = 2; - IDLE = 3; - UNAVAILABLE = 4; + rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationReply) {} + rpc getServiceState(farm_ng.service.proto.GetServiceStateRequest) + returns (farm_ng.service.proto.GetServiceStateReply) {} } message GetCalibrationRequest { string message = 1; } -message GetCalibrationResult { +message GetCalibrationReply { OakCalibration calibration = 1; - ReplyStatus status = 2; -} - -message GetServiceStateRequest { - string message = 1; -} - -message GetServiceStateResult { - string state_name = 1; - OakServiceState state = 2; - ReplyStatus status = 3; } message CameraControlRequest { @@ -62,11 +39,10 @@ message CameraControlRequest { CameraSettings rgb_settings = 2; } - message CameraControlReply { - ReplyStatus status = 1; - CameraSettings stereo_settings = 2; - CameraSettings rgb_settings = 3; + farm_ng.service.proto.ReplyStatus status = 1; + CameraSettings stereo_settings = 2; + CameraSettings rgb_settings = 3; } message StreamFramesRequest { @@ -74,8 +50,7 @@ message StreamFramesRequest { } message StreamFramesReply { - ReplyStatus status = 1; - OakSyncFrame frame = 2; + OakSyncFrame frame = 1; } message Vec3F32 { @@ -94,7 +69,7 @@ message CameraSettings { message OakImageMeta { int64 category = 1; // DepthAI catetory int64 instance_num = 2; //DepthAI instance number - int64 sequence_num = 3; // for synchronization between left, right and stereo + int64 sequence_num = 3; // for synchronization between left, right and stereo double timestamp = 4; // seconds, synchronized with host monotonic double timestamp_device = 5; // seconds, device monotonic clock CameraSettings settings = 6; @@ -186,7 +161,6 @@ message Extrinsics { Vector3d translation = 4; } - message CameraData { uint32 camera_number = 1; int32 camera_type = 2; diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto new file mode 100644 index 00000000..2025c138 --- /dev/null +++ b/protos/farm_ng/service/service.proto @@ -0,0 +1,47 @@ +// Copyright (c) farm-ng, inc. +// +// Licensed under the Amiga Development Kit License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package farm_ng.service.proto; + +// TODO: Just use a bool where this is used? +enum ReplyStatus { + OK = 0; + FAILED = 1; +} + +enum ServiceState { + // Service state is not known + UNKNOWN = 0; + // Stopped + STOPPED = 1; + // Running, and actively serving data + RUNNING = 2; + // Idle, waiting for clients + IDLE = 3; + // Service isn't running at all + UNAVAILABLE = 4; + // Service is in some error state + ERROR = 5; +} + +message GetServiceStateRequest { +} + +message GetServiceStateReply { + ServiceState state = 1; + uint32 pid = 2; // process id + double uptime = 3; +} diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index b57057b9..363e88b8 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -17,7 +17,8 @@ import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc - +from farm_ng.service import service_pb2 +from farm_ng.service.service import ServiceState logging.basicConfig(level=logging.INFO) @@ -35,26 +36,6 @@ class CanbusClientConfig: address: str = "localhost" # the address name of the server -class CanbusServiceState: - """Canbus service state.""" - - def __init__(self, proto: canbus_pb2.CanbusServiceState = None) -> None: - self._proto = canbus_pb2.CanbusServiceState.UNAVAILABLE - if proto is not None: - self._proto = proto - - @property - def value(self) -> int: - return self._proto - - @property - def name(self) -> str: - return canbus_pb2.CanbusServiceState.DESCRIPTOR.values[self.value].name - - def __repr__(self) -> str: - return f"{self.__class__.__name__}: ({self.value}, {self.name})" - - class CanbusClient: def __init__(self, config: CanbusClientConfig) -> None: self.config = config @@ -70,15 +51,15 @@ def server_address(self) -> str: """Returns the composed address and port.""" return f"{self.config.address}:{self.config.port}" - async def get_state(self) -> CanbusServiceState: - state: CanbusServiceState + async def get_state(self) -> ServiceState: + state: ServiceState try: - response: canbus_pb2.GetServiceStateResponse = await self.stub.getServiceState( - canbus_pb2.GetServiceStateRequest() + response: service_pb2.GetServiceStateReply = await self.stub.getServiceState( + service_pb2.GetServiceStateRequest() ) - state = CanbusServiceState(response.state) + state = ServiceState(response.state) except grpc.RpcError: - state = CanbusServiceState() + state = ServiceState() self.logger.debug("CanbusServiceStub: port -> %i state is: %s", self.config.port, state.name) return state diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index 10407ad2..15c2702d 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -19,8 +19,11 @@ import grpc from farm_ng.oak import oak_pb2 from farm_ng.oak import oak_pb2_grpc +from farm_ng.service import service_pb2 +from farm_ng.service.service import ServiceState -__all__ = ["OakCameraClientConfig", "OakCameraClient", "OakCameraServiceState"] + +__all__ = ["OakCameraClientConfig", "OakCameraClient"] logging.basicConfig(level=logging.INFO) @@ -74,38 +77,6 @@ class OakCameraClientConfig: address: str = "localhost" # the address name of the server -class OakCameraServiceState: - """Camera service state. - - Possible state values: - - UNKNOWN: undefined state. - - RUNNING: the service is up AND streaming. - - IDLE: the service is up AND NOT streaming. - - UNAVAILABLE: the service is not available. - - Args: - proto (oak_pb2.OakServiceState): protobuf message containing the camera state. - """ - - def __init__(self, proto: oak_pb2.OakServiceState = None) -> None: - self._proto = oak_pb2.OakServiceState.UNAVAILABLE - if proto is not None: - self._proto = proto - - @property - def value(self) -> int: - """Returns the state enum value.""" - return self._proto - - @property - def name(self) -> str: - """Return the state name.""" - return oak_pb2.OakServiceState.DESCRIPTOR.values[self.value].name - - def __repr__(self) -> str: - return f"{self.__class__.__name__}: ({self.value}, {self.name})" - - class OakCameraClient: """Oak-D camera client. @@ -144,20 +115,20 @@ def mono_settings(self) -> str: return self._mono_camera_settings def settings_reply(self, reply) -> None: - if reply.status == oak_pb2.ReplyStatus.OK: + if reply.status == service_pb2.ReplyStatus.OK: self._mono_camera_settings.CopyFrom(reply.stereo_settings) self._rgb_camera_settings.CopyFrom(reply.rgb_settings) - async def get_state(self) -> OakCameraServiceState: + async def get_state(self) -> ServiceState: """Async call to retrieve the state of the connected service.""" - state: OakCameraServiceState + state: ServiceState try: - response: oak_pb2.GetServiceStateResponse = await self.stub.getServiceState( - oak_pb2.GetServiceStateRequest() + response: service_pb2.GetServiceStateReply = await self.stub.getServiceState( + service_pb2.GetServiceStateRequest() ) - state = OakCameraServiceState(response.state) + state = ServiceState(response.state) except grpc.RpcError: - state = OakCameraServiceState() + state = ServiceState() self.logger.debug("OakServiceStub: port -> %i state is: %s", self.config.port, state.name) return state diff --git a/py/farm_ng/service/service.py b/py/farm_ng/service/service.py new file mode 100644 index 00000000..23cb0e55 --- /dev/null +++ b/py/farm_ng/service/service.py @@ -0,0 +1,48 @@ +# Copyright (c) farm-ng, inc. +# +# Licensed under the Amiga Development Kit License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from farm_ng.service import service_pb2 + + +class ServiceState: + """Generic service state. + + Possible state values: + - UNKNOWN: undefined state. + - STOPPED: the service is stopped + - RUNNING: the service is up AND streaming. + - IDLE: the service is up AND NOT streaming. + - UNAVAILABLE: the service is not available. + - ERROR: the service is an error state. + + Args: + proto (service_pb2.ServiceState): protobuf message containing the service state. + """ + + def __init__(self, proto: service_pb2.ServiceState = None) -> None: + self._proto = service_pb2.ServiceState.UNAVAILABLE + if proto is not None: + self._proto = proto + + @property + def value(self) -> int: + """Returns the state enum value.""" + return self._proto + + @property + def name(self) -> str: + """Return the state name.""" + return service_pb2.ServiceState.DESCRIPTOR.values[self.value].name + + def __repr__(self) -> str: + return f"{self.__class__.__name__}: ({self.value}, {self.name})" diff --git a/setup.cfg b/setup.cfg index 34f3052b..661e1006 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ packages = farm_ng farm_ng.canbus farm_ng.oak + farm_ng.service [options.extras_require] dev = @@ -63,14 +64,11 @@ files = py/sophus, py/tests, py/examples ignore_missing_imports = True [options.package_data] -farm_ng.controller = - *.proto - farm_ng.oak = *.proto farm_ng.canbus = *.proto -farm_ng.state_estimator = +farm_ng.service = *.proto From 83b7c3c3be4272346398daf32288f1866c3d8399 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 10:57:29 -0500 Subject: [PATCH 05/23] Remove STOPPED service state --- protos/farm_ng/service/service.proto | 10 ++++------ py/farm_ng/oak/camera_client.py | 2 +- py/farm_ng/service/service.py | 1 - 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto index 2025c138..442739c6 100644 --- a/protos/farm_ng/service/service.proto +++ b/protos/farm_ng/service/service.proto @@ -25,16 +25,14 @@ enum ReplyStatus { enum ServiceState { // Service state is not known UNKNOWN = 0; - // Stopped - STOPPED = 1; // Running, and actively serving data - RUNNING = 2; + RUNNING = 1; // Idle, waiting for clients - IDLE = 3; + IDLE = 2; // Service isn't running at all - UNAVAILABLE = 4; + UNAVAILABLE = 3; // Service is in some error state - ERROR = 5; + ERROR = 4; } message GetServiceStateRequest { diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index 15c2702d..e846115d 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -92,7 +92,7 @@ def __init__(self, config: OakCameraClientConfig) -> None: self.logger = logging.getLogger(self.__class__.__name__) - # create a async connection with the server + # create an async connection with the server self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = oak_pb2_grpc.OakServiceStub(self.channel) diff --git a/py/farm_ng/service/service.py b/py/farm_ng/service/service.py index 23cb0e55..cb2410ab 100644 --- a/py/farm_ng/service/service.py +++ b/py/farm_ng/service/service.py @@ -19,7 +19,6 @@ class ServiceState: Possible state values: - UNKNOWN: undefined state. - - STOPPED: the service is stopped - RUNNING: the service is up AND streaming. - IDLE: the service is up AND NOT streaming. - UNAVAILABLE: the service is not available. From 27a1aecd5b7c900b1c9883f94f1fb26b3947000e Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 16:36:32 -0500 Subject: [PATCH 06/23] Add repeated Timestamp to ServiceStateReply --- protos/CMakeLists.txt | 2 ++ protos/farm_ng/service/service.proto | 3 +++ setup.cfg | 1 + 3 files changed, 6 insertions(+) diff --git a/protos/CMakeLists.txt b/protos/CMakeLists.txt index bd97369e..0206c61d 100644 --- a/protos/CMakeLists.txt +++ b/protos/CMakeLists.txt @@ -2,6 +2,7 @@ farm_ng_add_protobufs(farm_ng_amiga_proto_defs NAMESPACE farm_ng_amiga INCLUDE_DIRS ${Sophus_PROTO_DIR} + ${farm_ng_core_PROTO_DIR} PROTO_FILES farm_ng/canbus/canbus.proto farm_ng/oak/oak.proto @@ -9,4 +10,5 @@ farm_ng_add_protobufs(farm_ng_amiga_proto_defs DEPENDENCIES Sophus::sophus_linalg_proto_defs Sophus::sophus_lie_proto_defs + farm_ng_core::farm_ng_core_proto_defs ) diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto index 442739c6..aeffc651 100644 --- a/protos/farm_ng/service/service.proto +++ b/protos/farm_ng/service/service.proto @@ -14,6 +14,8 @@ syntax = "proto3"; +import "farm_ng/core/timestamp.proto"; + package farm_ng.service.proto; // TODO: Just use a bool where this is used? @@ -42,4 +44,5 @@ message GetServiceStateReply { ServiceState state = 1; uint32 pid = 2; // process id double uptime = 3; + repeated farm_ng.core.proto.Timestamp timestamps = 4; } diff --git a/setup.cfg b/setup.cfg index 661e1006..15de34e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ python_requires = >=3.6 setup_requires = wheel sophus + farm-ng-core install_requires = protobuf grpcio From b1fed5ab20cf855771ffcc2825f5981f53ead790 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 17:47:04 -0500 Subject: [PATCH 07/23] getServiceState as service.proto defined rpc --- protos/farm_ng/canbus/canbus.proto | 4 ---- protos/farm_ng/oak/oak.proto | 2 -- protos/farm_ng/service/service.proto | 4 ++++ py/farm_ng/canbus/canbus_client.py | 5 ++++- py/farm_ng/oak/camera_client.py | 8 ++++++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index 26e8a5c3..b7b58a96 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -14,8 +14,6 @@ syntax = "proto3"; -import "farm_ng/service/service.proto"; - package farm_ng.canbus.proto; service CanbusService { @@ -23,8 +21,6 @@ service CanbusService { returns (stream StreamCanbusReply) {} rpc sendCanbusMessage(stream SendCanbusMessageRequest) returns (SendCanbusMessageReply) {} - rpc getServiceState(farm_ng.service.proto.GetServiceStateRequest) - returns (farm_ng.service.proto.GetServiceStateReply) {} } message StreamCanbusRequest { diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index 62b00810..f9843ebe 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -22,8 +22,6 @@ service OakService { rpc cameraControl(CameraControlRequest) returns (CameraControlReply) {} rpc streamFrames(StreamFramesRequest) returns (stream StreamFramesReply) {} rpc getCalibration(GetCalibrationRequest) returns (GetCalibrationReply) {} - rpc getServiceState(farm_ng.service.proto.GetServiceStateRequest) - returns (farm_ng.service.proto.GetServiceStateReply) {} } message GetCalibrationRequest { diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto index aeffc651..65096edf 100644 --- a/protos/farm_ng/service/service.proto +++ b/protos/farm_ng/service/service.proto @@ -18,6 +18,10 @@ import "farm_ng/core/timestamp.proto"; package farm_ng.service.proto; +service Service { + rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateReply) {} +} + // TODO: Just use a bool where this is used? enum ReplyStatus { OK = 0; diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index 363e88b8..9ba9fa9b 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -18,6 +18,7 @@ from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc from farm_ng.service import service_pb2 +from farm_ng.service import service_pb2_grpc from farm_ng.service.service import ServiceState logging.basicConfig(level=logging.INFO) @@ -45,16 +46,18 @@ def __init__(self, config: CanbusClientConfig) -> None: # create a async connection with the server self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = canbus_pb2_grpc.CanbusServiceStub(self.channel) + self.state_stub = service_pb2_grpc.ServiceStub(self.channel) @property def server_address(self) -> str: """Returns the composed address and port.""" return f"{self.config.address}:{self.config.port}" + # TODO: Defined by ServiceMonitorClient async def get_state(self) -> ServiceState: state: ServiceState try: - response: service_pb2.GetServiceStateReply = await self.stub.getServiceState( + response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( service_pb2.GetServiceStateRequest() ) state = ServiceState(response.state) diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index e846115d..79f3d7ec 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -20,6 +20,7 @@ from farm_ng.oak import oak_pb2 from farm_ng.oak import oak_pb2_grpc from farm_ng.service import service_pb2 +from farm_ng.service import service_pb2_grpc from farm_ng.service.service import ServiceState @@ -95,6 +96,7 @@ def __init__(self, config: OakCameraClientConfig) -> None: # create an async connection with the server self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = oak_pb2_grpc.OakServiceStub(self.channel) + self.state_stub = service_pb2_grpc.ServiceStub(self.channel) self._mono_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self._rgb_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) @@ -115,15 +117,17 @@ def mono_settings(self) -> str: return self._mono_camera_settings def settings_reply(self, reply) -> None: - if reply.status == service_pb2.ReplyStatus.OK: + if reply.success: self._mono_camera_settings.CopyFrom(reply.stereo_settings) self._rgb_camera_settings.CopyFrom(reply.rgb_settings) + # TODO: Defined by ServiceMonitorClient async def get_state(self) -> ServiceState: + """Async call to retrieve the state of the connected service.""" state: ServiceState try: - response: service_pb2.GetServiceStateReply = await self.stub.getServiceState( + response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( service_pb2.GetServiceStateRequest() ) state = ServiceState(response.state) From 95637904c27ee8d8b011eaeec12f20cce959aacd Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 17:47:33 -0500 Subject: [PATCH 08/23] Replace ReplyStatus with bool in proto def --- protos/farm_ng/canbus/canbus.proto | 2 +- protos/farm_ng/oak/oak.proto | 2 +- protos/farm_ng/service/service.proto | 6 ------ 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index b7b58a96..f619bb48 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -35,7 +35,7 @@ message SendCanbusMessageRequest { } message SendCanbusMessageReply { - farm_ng.service.proto.ReplyStatus status = 1; + bool success = 1; } message RawCanbusMessage { diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index f9843ebe..917b32d2 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -38,7 +38,7 @@ message CameraControlRequest { } message CameraControlReply { - farm_ng.service.proto.ReplyStatus status = 1; + bool success = 1; CameraSettings stereo_settings = 2; CameraSettings rgb_settings = 3; } diff --git a/protos/farm_ng/service/service.proto b/protos/farm_ng/service/service.proto index 65096edf..776e0cd5 100644 --- a/protos/farm_ng/service/service.proto +++ b/protos/farm_ng/service/service.proto @@ -22,12 +22,6 @@ service Service { rpc getServiceState(GetServiceStateRequest) returns (GetServiceStateReply) {} } -// TODO: Just use a bool where this is used? -enum ReplyStatus { - OK = 0; - FAILED = 1; -} - enum ServiceState { // Service state is not known UNKNOWN = 0; From d9793493b9ea9ca2b3dd3613c3589e36b0eb96bf Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 18:34:14 -0500 Subject: [PATCH 09/23] Generic ServiceClient --- py/farm_ng/canbus/canbus_client.py | 52 +++----------- py/farm_ng/oak/camera_client.py | 57 +++------------ py/farm_ng/service/service.py | 47 ------------ py/farm_ng/service/service_client.py | 104 +++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 137 deletions(-) delete mode 100644 py/farm_ng/service/service.py create mode 100644 py/farm_ng/service/service_client.py diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index 9ba9fa9b..b50a007a 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -12,59 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from dataclasses import dataclass -import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc -from farm_ng.service import service_pb2 -from farm_ng.service import service_pb2_grpc -from farm_ng.service.service import ServiceState +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceClient logging.basicConfig(level=logging.INFO) -@dataclass -class CanbusClientConfig: - """Canbus client configuration. +class CanbusClient(ServiceClient): + """Amiga canbus client. - Attributes: - port (int): the port to connect to the server. - address (str): the address to connect to the server. - """ - - port: int # the port of the server address - address: str = "localhost" # the address name of the server - - -class CanbusClient: - def __init__(self, config: CanbusClientConfig) -> None: - self.config = config + Client class to connect with the Amiga brain canbus service. + Inherits from ServiceClient. - self.logger = logging.getLogger(self.__class__.__name__) + Args: + config (ClientConfig): the grpc configuration data structure. + """ + def __init__(self, config: ClientConfig) -> None: + super().__init__(config) # create a async connection with the server - self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = canbus_pb2_grpc.CanbusServiceStub(self.channel) - self.state_stub = service_pb2_grpc.ServiceStub(self.channel) - - @property - def server_address(self) -> str: - """Returns the composed address and port.""" - return f"{self.config.address}:{self.config.port}" - - # TODO: Defined by ServiceMonitorClient - async def get_state(self) -> ServiceState: - state: ServiceState - try: - response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( - service_pb2.GetServiceStateRequest() - ) - state = ServiceState(response.state) - except grpc.RpcError: - state = ServiceState() - self.logger.debug("CanbusServiceStub: port -> %i state is: %s", self.config.port, state.name) - return state def stream(self): """Return the async streaming object.""" diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index 79f3d7ec..c6d46da2 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -14,17 +14,13 @@ import asyncio import logging import time -from dataclasses import dataclass -import grpc from farm_ng.oak import oak_pb2 from farm_ng.oak import oak_pb2_grpc -from farm_ng.service import service_pb2 -from farm_ng.service import service_pb2_grpc -from farm_ng.service.service import ServiceState +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceClient - -__all__ = ["OakCameraClientConfig", "OakCameraClient"] +__all__ = ["OakCameraClient"] logging.basicConfig(level=logging.INFO) @@ -65,49 +61,27 @@ def next_call_wait(self): return self.period - (time.monotonic() - self.last_call) -@dataclass -class OakCameraClientConfig: - """Camera client configuration. - - Attributes: - port (int): the port to connect to the server. - address (str): the address to connect to the server. - """ - - port: int # the port of the server address - address: str = "localhost" # the address name of the server - - -class OakCameraClient: +class OakCameraClient(ServiceClient): """Oak-D camera client. Client class to connect with the Amiga brain camera services. - Internally implements an `asyncio` gRPC channel. + Inherits from ServiceClient. Args: - config (OakCameraClientConfig): the camera configuration data structure. + config (ClientConfig): the grpc configuration data structure. """ - def __init__(self, config: OakCameraClientConfig) -> None: - self.config = config - - self.logger = logging.getLogger(self.__class__.__name__) + def __init__(self, config: ClientConfig) -> None: + super().__init__(config) # create an async connection with the server - self.channel = grpc.aio.insecure_channel(self.server_address) self.stub = oak_pb2_grpc.OakServiceStub(self.channel) - self.state_stub = service_pb2_grpc.ServiceStub(self.channel) self._mono_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self._rgb_camera_settings = oak_pb2.CameraSettings(auto_exposure=True) self.needs_update = False - @property - def server_address(self) -> str: - """Returns the composed address and port.""" - return f"{self.config.address}:{self.config.port}" - @property def rgb_settings(self) -> str: return self._rgb_camera_settings @@ -121,21 +95,6 @@ def settings_reply(self, reply) -> None: self._mono_camera_settings.CopyFrom(reply.stereo_settings) self._rgb_camera_settings.CopyFrom(reply.rgb_settings) - # TODO: Defined by ServiceMonitorClient - async def get_state(self) -> ServiceState: - - """Async call to retrieve the state of the connected service.""" - state: ServiceState - try: - response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( - service_pb2.GetServiceStateRequest() - ) - state = ServiceState(response.state) - except grpc.RpcError: - state = ServiceState() - self.logger.debug("OakServiceStub: port -> %i state is: %s", self.config.port, state.name) - return state - async def send_settings(self) -> oak_pb2.CameraControlReply: request = oak_pb2.CameraControlRequest() request.stereo_settings.CopyFrom(self._mono_camera_settings) diff --git a/py/farm_ng/service/service.py b/py/farm_ng/service/service.py deleted file mode 100644 index cb2410ab..00000000 --- a/py/farm_ng/service/service.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) farm-ng, inc. -# -# Licensed under the Amiga Development Kit License (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from farm_ng.service import service_pb2 - - -class ServiceState: - """Generic service state. - - Possible state values: - - UNKNOWN: undefined state. - - RUNNING: the service is up AND streaming. - - IDLE: the service is up AND NOT streaming. - - UNAVAILABLE: the service is not available. - - ERROR: the service is an error state. - - Args: - proto (service_pb2.ServiceState): protobuf message containing the service state. - """ - - def __init__(self, proto: service_pb2.ServiceState = None) -> None: - self._proto = service_pb2.ServiceState.UNAVAILABLE - if proto is not None: - self._proto = proto - - @property - def value(self) -> int: - """Returns the state enum value.""" - return self._proto - - @property - def name(self) -> str: - """Return the state name.""" - return service_pb2.ServiceState.DESCRIPTOR.values[self.value].name - - def __repr__(self) -> str: - return f"{self.__class__.__name__}: ({self.value}, {self.name})" diff --git a/py/farm_ng/service/service_client.py b/py/farm_ng/service/service_client.py new file mode 100644 index 00000000..e2e947f7 --- /dev/null +++ b/py/farm_ng/service/service_client.py @@ -0,0 +1,104 @@ +# Copyright (c) farm-ng, inc. +# +# Licensed under the Amiga Development Kit License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from dataclasses import dataclass + +import grpc +from farm_ng.service import service_pb2 +from farm_ng.service import service_pb2_grpc + + +@dataclass +class ClientConfig: + """Client configuration. + + Attributes: + port (int): the port to connect to the server. + address (str): the address to connect to the server. + """ + + port: int # the port of the server address + address: str = "localhost" # the address name of the server + + +class ServiceState: + """Generic service state. + + Possible state values: + - UNKNOWN: undefined state. + - RUNNING: the service is up AND streaming. + - IDLE: the service is up AND NOT streaming. + - UNAVAILABLE: the service is not available. + - ERROR: the service is an error state. + + Args: + proto (service_pb2.ServiceState): protobuf message containing the service state. + """ + + def __init__(self, proto: service_pb2.ServiceState = None) -> None: + self._proto = service_pb2.ServiceState.UNAVAILABLE + if proto is not None: + self._proto = proto + + @property + def value(self) -> int: + """Returns the state enum value.""" + return self._proto + + @property + def name(self) -> str: + """Return the state name.""" + return service_pb2.ServiceState.DESCRIPTOR.values[self.value].name + + def __repr__(self) -> str: + return f"{self.__class__.__name__}: ({self.value}, {self.name})" + + +class ServiceClient: + """Generic client. + + Generic client class to connect with the Amiga brain services. + Internally implements an `asyncio` gRPC channel. + Designed to be imported by service specific clients. + + Args: + config (ClientConfig): the grpc configuration data structure. + """ + + def __init__(self, config: ClientConfig) -> None: + print('config in ServiceClient', config) + self.config = config + + self.logger = logging.getLogger(self.__class__.__name__) + + # create an async connection with the server + self.channel = grpc.aio.insecure_channel(self.server_address) + self.state_stub = service_pb2_grpc.ServiceStub(self.channel) + + @property + def server_address(self) -> str: + """Returns the composed address and port.""" + return f"{self.config.address}:{self.config.port}" + + async def get_state(self) -> ServiceState: + state: ServiceState + try: + response: service_pb2.GetServiceStateReply = await self.state_stub.getServiceState( + service_pb2.GetServiceStateRequest() + ) + state = ServiceState(response.state) + except grpc.RpcError: + state = ServiceState() + self.logger.info(f" {self.__class__.__name__} on port: %s state is: %s", self.config.port, state.name) + return state From e7d368bbf77d4feacc9146e694456413af8833a2 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 18:39:23 -0500 Subject: [PATCH 10/23] whoops logger spam --- py/farm_ng/service/service_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/farm_ng/service/service_client.py b/py/farm_ng/service/service_client.py index e2e947f7..9044799b 100644 --- a/py/farm_ng/service/service_client.py +++ b/py/farm_ng/service/service_client.py @@ -100,5 +100,5 @@ async def get_state(self) -> ServiceState: state = ServiceState(response.state) except grpc.RpcError: state = ServiceState() - self.logger.info(f" {self.__class__.__name__} on port: %s state is: %s", self.config.port, state.name) + self.logger.debug(f" {self.__class__.__name__} on port: %s state is: %s", self.config.port, state.name) return state From 246571c968562a69597bfc66a47f6f5d43be4254 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 19:17:39 -0500 Subject: [PATCH 11/23] get_calibration stub wrapper --- protos/farm_ng/oak/oak.proto | 1 - py/farm_ng/oak/camera_client.py | 8 ++++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/protos/farm_ng/oak/oak.proto b/protos/farm_ng/oak/oak.proto index 917b32d2..6b23694c 100644 --- a/protos/farm_ng/oak/oak.proto +++ b/protos/farm_ng/oak/oak.proto @@ -25,7 +25,6 @@ service OakService { } message GetCalibrationRequest { - string message = 1; } message GetCalibrationReply { diff --git a/py/farm_ng/oak/camera_client.py b/py/farm_ng/oak/camera_client.py index c6d46da2..4acb1cbd 100644 --- a/py/farm_ng/oak/camera_client.py +++ b/py/farm_ng/oak/camera_client.py @@ -102,6 +102,14 @@ async def send_settings(self) -> oak_pb2.CameraControlReply: self.needs_update = False return await self.stub.cameraControl(request) + async def get_calibration(self) -> oak_pb2.GetCalibrationReply: + """Return the oak calibration as oak_pb2.GetCalibrationReply. + + Args: + request: proto defined request for oak calibration (oak_pb2.GetCalibrationRequest) + """ + return await self.stub.getCalibration(oak_pb2.GetCalibrationRequest()) + @RateLimiter(period=1) def update_rgb_settings(self, rgb_settings): self.needs_update = True From 3d4c20bf554136a223a176120a8ce35a4c47b5a3 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Mon, 16 Jan 2023 19:44:05 -0500 Subject: [PATCH 12/23] Minimal send utility --- py/farm_ng/canbus/canbus_client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index b50a007a..4e59fff1 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc @@ -39,3 +40,7 @@ def __init__(self, config: ClientConfig) -> None: def stream(self): """Return the async streaming object.""" return self.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) + + def send_can_messages(self, request_iterator: List[canbus_pb2.SendCanbusMessageRequest]): + """Passes the request_iterator to the canbus service to format and send can messages to the Amiga.""" + return self.stub.sendCanbusMessage(request_iterator) From d9d5011c0038f94ae03700356f3a784468c4669f Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 13:16:53 -0500 Subject: [PATCH 13/23] Remove send_can_messages util --- py/farm_ng/canbus/canbus_client.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/py/farm_ng/canbus/canbus_client.py b/py/farm_ng/canbus/canbus_client.py index 4e59fff1..b50a007a 100644 --- a/py/farm_ng/canbus/canbus_client.py +++ b/py/farm_ng/canbus/canbus_client.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List from farm_ng.canbus import canbus_pb2 from farm_ng.canbus import canbus_pb2_grpc @@ -40,7 +39,3 @@ def __init__(self, config: ClientConfig) -> None: def stream(self): """Return the async streaming object.""" return self.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) - - def send_can_messages(self, request_iterator: List[canbus_pb2.SendCanbusMessageRequest]): - """Passes the request_iterator to the canbus service to format and send can messages to the Amiga.""" - return self.stub.sendCanbusMessage(request_iterator) From 3d73c19aa3afaabc41356d8ae2a22f7bc1e2e043 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 13:48:04 -0500 Subject: [PATCH 14/23] Update tests to use generic Service state class / protos --- py/tests/test_canbus.py | 20 ++++++++++---------- py/tests/test_oak.py | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/py/tests/test_canbus.py b/py/tests/test_canbus.py index 54d97126..73aadad8 100644 --- a/py/tests/test_canbus.py +++ b/py/tests/test_canbus.py @@ -12,29 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.canbus_client import CanbusClient -from farm_ng.canbus.canbus_client import CanbusClientConfig -from farm_ng.canbus.canbus_client import CanbusServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceState @pytest.fixture(name="config") -def fixture_config() -> CanbusClientConfig: - return CanbusClientConfig(port=50051) +def fixture_config() -> ClientConfig: + return ClientConfig(port=50051) class TestCanbusClient: - def test_smoke_config(self, config: CanbusClientConfig) -> None: + def test_smoke_config(self, config: ClientConfig) -> None: assert config.port == 50051 assert config.address == "localhost" - def test_smoke(self, config: CanbusClientConfig) -> None: + def test_smoke(self, config: ClientConfig) -> None: client = CanbusClient(config) assert client is not None assert client.server_address == "localhost:50051" @pytest.mark.asyncio - async def test_state(self, config: CanbusClientConfig) -> None: + async def test_state(self, config: ClientConfig) -> None: client = CanbusClient(config) - state: CanbusServiceState = await client.get_state() - assert state.value == canbus_pb2.CanbusServiceState.UNAVAILABLE + state: ServiceState = await client.get_state() + assert state.value == service_pb2.ServiceState.UNAVAILABLE diff --git a/py/tests/test_oak.py b/py/tests/test_oak.py index 4b520d5a..a785ffd3 100644 --- a/py/tests/test_oak.py +++ b/py/tests/test_oak.py @@ -12,29 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest -from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig -from farm_ng.oak.camera_client import OakCameraServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from farm_ng.service.service_client import ServiceState @pytest.fixture(name="config") -def fixture_config() -> OakCameraClientConfig: - return OakCameraClientConfig(port=50051) +def fixture_config() -> ClientConfig: + return ClientConfig(port=50051) class TestOakClient: - def test_smoke_config(self, config: OakCameraClientConfig) -> None: + def test_smoke_config(self, config: ClientConfig) -> None: assert config.port == 50051 assert config.address == "localhost" - def test_smoke(self, config: OakCameraClientConfig) -> None: + def test_smoke(self, config: ClientConfig) -> None: client = OakCameraClient(config) assert client is not None assert client.server_address == "localhost:50051" @pytest.mark.asyncio - async def test_state(self, config: OakCameraClientConfig) -> None: + async def test_state(self, config: ClientConfig) -> None: client = OakCameraClient(config) - state: OakCameraServiceState = await client.get_state() - assert state.value == oak_pb2.OakServiceState.UNAVAILABLE + state: ServiceState = await client.get_state() + assert state.value == service_pb2.OakServiceState.UNAVAILABLE From ef7b25b15dbca027127bcc89af7e51c508f8fa76 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 13:49:42 -0500 Subject: [PATCH 15/23] Missed one --- py/tests/test_oak.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/tests/test_oak.py b/py/tests/test_oak.py index a785ffd3..8354ed43 100644 --- a/py/tests/test_oak.py +++ b/py/tests/test_oak.py @@ -37,4 +37,4 @@ def test_smoke(self, config: ClientConfig) -> None: async def test_state(self, config: ClientConfig) -> None: client = OakCameraClient(config) state: ServiceState = await client.get_state() - assert state.value == service_pb2.OakServiceState.UNAVAILABLE + assert state.value == service_pb2.ServiceState.UNAVAILABLE From 62033ac6c634d06dfcaae80a1c9688adbc928b7f Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 14:22:18 -0500 Subject: [PATCH 16/23] Update camera_client example --- py/examples/camera_client/main.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/py/examples/camera_client/main.py b/py/examples/camera_client/main.py index 722d578c..879b906d 100644 --- a/py/examples/camera_client/main.py +++ b/py/examples/camera_client/main.py @@ -20,33 +20,28 @@ import numpy as np from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig -from farm_ng.oak.camera_client import OakCameraServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig async def main(address: str, port: int, stream_every_n: int) -> None: # configure the camera client - config = OakCameraClientConfig(address=address, port=port) + config = ClientConfig(address=address, port=port) client = OakCameraClient(config) - # get the streaming object + # Get the streaming object from the service response_stream = client.stream_frames(every_n=stream_every_n) - # start the streaming service - await client.connect_to_service() - while True: - # query the service state - # NOTE: This could be done asynchronously with client.poll_service_state() - # as in other examples, such as camera_client_gui - state: OakCameraServiceState = await client.get_state() - - if state.value != oak_pb2.OakServiceState.RUNNING: + # check the service state + state = await client.get_state() + if state.value != service_pb2.ServiceState.RUNNING: print("Camera is not streaming!") continue response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: + + if response: # get the sync frame frame: oak_pb2.OakSyncFrame = response.frame print(f"Got frame: {frame.sequence_num}") @@ -54,8 +49,12 @@ async def main(address: str, port: int, stream_every_n: int) -> None: print(f"Timestamp: {frame.rgb.meta.timestamp}") print("#################################\n") - # cast image data bytes to numpy and decode + # Check that rgb frame is in OakSyncFrame response # NOTE: explore frame.[rgb, disparity, left, right] + if not hasattr(frame, 'rgb'): + continue + + # cast image data bytes to numpy and decode image = np.frombuffer(frame.rgb.image_data, dtype="uint8") image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED) @@ -69,7 +68,7 @@ async def main(address: str, port: int, stream_every_n: int) -> None: parser = argparse.ArgumentParser(prog="amiga-camera-app") parser.add_argument("--port", type=int, required=True, help="The camera port.") parser.add_argument("--address", type=str, default="localhost", help="The camera address") - parser.add_argument("--stream-every-n", type=int, default=4, help="Streaming frequency") + parser.add_argument("--stream-every-n", type=int, default=1, help="Streaming frequency") args = parser.parse_args() asyncio.run(main(args.address, args.port, args.stream_every_n)) From f414e0b5f3a87bf939e5e7803e7ba9013a0dfbdd Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 14:44:02 -0500 Subject: [PATCH 17/23] Update camera_client_gui example --- py/examples/camera_client_gui/main.py | 66 ++++++++++++++++----------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/py/examples/camera_client_gui/main.py b/py/examples/camera_client_gui/main.py index 6bb5647c..c1186ac9 100644 --- a/py/examples/camera_client_gui/main.py +++ b/py/examples/camera_client_gui/main.py @@ -17,9 +17,11 @@ import os from typing import List +import grpc from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig os.environ["KIVY_NO_ARGS"] = "1" @@ -63,9 +65,6 @@ def __init__(self, address: str, port: int, stream_every_n: int) -> None: self.tasks: List[asyncio.Task] = [] - self.client: OakCameraClient - self.config: OakCameraClientConfig - def build(self): return Builder.load_string(kv) @@ -78,13 +77,11 @@ async def run_wrapper(): task.cancel() # configure the camera client - self.config = OakCameraClientConfig(address=self.address, port=self.port) - self.client = OakCameraClient(self.config) + config = ClientConfig(address=self.address, port=self.port) + client = OakCameraClient(config) # Stream camera frames - self.tasks.append(asyncio.ensure_future(self.stream_camera(self.client))) - # Continuously monitor camera service state - self.tasks.append(asyncio.ensure_future(self.client.poll_service_state())) + self.tasks.append(asyncio.ensure_future(self.stream_camera(client))) return await asyncio.gather(run_wrapper(), *self.tasks) @@ -95,27 +92,42 @@ async def stream_camera(self, client: OakCameraClient) -> None: response_stream = None while True: - if client.state.value != oak_pb2.OakServiceState.RUNNING: - # start the streaming service - await client.connect_to_service() - await asyncio.sleep(0.01) - continue - elif response_stream is None: - # get the streaming object - response_stream = client.stream_frames(every_n=self.stream_every_n) - await asyncio.sleep(0.01) + # check the state of the service + state = await client.get_state() + + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Camera service is not streaming or ready to stream") + await asyncio.sleep(0.1) continue - response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: - # get the sync frame - frame: oak_pb2.OakSyncFrame = response.frame + # Create the stream + if response_stream is None: + response_stream = client.stream_frames(every_n=1) + + try: + # try/except so app doesn't crash on killed service + response: oak_pb2.StreamFramesReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception: + response_stream.cancel() + response_stream = None + continue - # get image and show - for view_name in ["rgb", "disparity", "left", "right"]: - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + # get the sync frame + frame: oak_pb2.OakSyncFrame = response.frame + + # get image and show + for view_name in ["rgb", "disparity", "left", "right"]: + # Skip if view_name was not included in frame + if not hasattr(frame, view_name): + continue + self.root.ids[view_name].texture = CoreImage( + io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" + ).texture if __name__ == "__main__": From 55f0fa8a79bbc2cfa7582e0582a41607e5f48d53 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 15:15:55 -0500 Subject: [PATCH 18/23] Update virtual_joystick example --- py/examples/camera_client_gui/main.py | 14 +-- py/examples/virtual_joystick/main.py | 117 +++++++++++++++----------- 2 files changed, 76 insertions(+), 55 deletions(-) diff --git a/py/examples/camera_client_gui/main.py b/py/examples/camera_client_gui/main.py index c1186ac9..0ad4188d 100644 --- a/py/examples/camera_client_gui/main.py +++ b/py/examples/camera_client_gui/main.py @@ -86,6 +86,8 @@ async def run_wrapper(): return await asyncio.gather(run_wrapper(), *self.tasks) async def stream_camera(self, client: OakCameraClient) -> None: + """This task listens to the camera client's stream and populates the tabbed panel with all 4 image streams + from the oak camera.""" while self.root is None: await asyncio.sleep(0.01) @@ -112,7 +114,8 @@ async def stream_camera(self, client: OakCameraClient) -> None: # try/except so app doesn't crash on killed service response: oak_pb2.StreamFramesReply = await response_stream.read() assert response and response != grpc.aio.EOF, "End of stream" - except Exception: + except Exception as e: + print(e) response_stream.cancel() response_stream = None continue @@ -123,11 +126,10 @@ async def stream_camera(self, client: OakCameraClient) -> None: # get image and show for view_name in ["rgb", "disparity", "left", "right"]: # Skip if view_name was not included in frame - if not hasattr(frame, view_name): - continue - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + if hasattr(frame, view_name): + self.root.ids[view_name].texture = CoreImage( + io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" + ).texture if __name__ == "__main__": diff --git a/py/examples/virtual_joystick/main.py b/py/examples/virtual_joystick/main.py index ac3f1ed7..69b98e39 100644 --- a/py/examples/virtual_joystick/main.py +++ b/py/examples/virtual_joystick/main.py @@ -14,7 +14,6 @@ import argparse import asyncio import io -import logging import os from math import sqrt from typing import Generator @@ -25,14 +24,15 @@ import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.canbus_client import CanbusClient -from farm_ng.canbus.canbus_client import CanbusClientConfig from farm_ng.canbus.packet import AmigaControlState from farm_ng.canbus.packet import AmigaTpdo1 from farm_ng.canbus.packet import make_amiga_rpdo1_proto from farm_ng.canbus.packet import parse_amiga_tpdo1_proto from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig + # Must come before kivy imports os.environ["KIVY_NO_ARGS"] = "1" @@ -257,21 +257,19 @@ async def run_wrapper() -> None: task.cancel() # configure the camera client - camera_config: OakCameraClientConfig = OakCameraClientConfig(address=self.address, port=self.camera_port) + camera_config: ClientConfig = ClientConfig(address=self.address, port=self.camera_port) camera_client: OakCameraClient = OakCameraClient(camera_config) # configure the canbus client - canbus_config: CanbusClientConfig = CanbusClientConfig(address=self.address, port=self.canbus_port) + canbus_config: ClientConfig = ClientConfig(address=self.address, port=self.canbus_port) canbus_client: CanbusClient = CanbusClient(canbus_config) # Camera task(s) self.async_tasks.append(asyncio.ensure_future(self.stream_camera(camera_client))) - self.async_tasks.append(asyncio.ensure_future(camera_client.poll_service_state())) # Canbus task(s) self.async_tasks.append(asyncio.ensure_future(self.stream_canbus(canbus_client))) - self.async_tasks.append(asyncio.ensure_future(self.send_can_msgs(canbus_client))) - self.async_tasks.append(asyncio.ensure_future(canbus_client.poll_service_state())) + canbus_client.stub.sendCanbusMessage(self.pose_generator()) # Drawing task(s) self.async_tasks.append(asyncio.ensure_future(self.draw())) @@ -291,21 +289,36 @@ async def stream_canbus(self, client: CanbusClient) -> None: response_stream: Optional[Generator[canbus_pb2.StreamCanbusReply]] = None while True: - while client.state.value != canbus_pb2.CanbusServiceState.RUNNING: - await client.connect_to_service() + # check the state of the service + state = await client.get_state() - if response_stream is None: - response_stream = client.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) - - response: canbus_pb2.StreamCanbusReply = await response_stream.read() - if response == grpc.aio.EOF: - # Checks for end of stream - break - if response and response.status == canbus_pb2.ReplyStatus.OK: - for proto in response.messages.messages: - amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto) - if amiga_tpdo1: - self.amiga_tpdo1 = amiga_tpdo1 + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + if response_stream is not None: + response_stream.cancel() + response_stream = None + + print("Canbus service is not streaming or ready to stream") + await asyncio.sleep(0.1) + continue + + if response_stream is None and state.value != service_pb2.ServiceState.UNAVAILABLE: + # get the streaming object + response_stream = client.stream() + + try: + # try/except so app doesn't crash on killed service + response: canbus_pb2.StreamCanbusReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + for proto in response.messages.messages: + amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto) + if amiga_tpdo1: + self.amiga_tpdo1 = amiga_tpdo1 async def stream_camera(self, client: OakCameraClient) -> None: """This task listens to the camera client's stream and populates the tabbed panel with all 4 image streams @@ -313,40 +326,46 @@ async def stream_camera(self, client: OakCameraClient) -> None: while self.root is None: await asyncio.sleep(0.01) - response_stream: Optional[Generator[oak_pb2.StreamFramesReply]] = None + response_stream = None while True: - while client.state.value != oak_pb2.OakServiceState.RUNNING: - # start the streaming service - await client.connect_to_service() - + # check the state of the service + state = await client.get_state() + + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Camera service is not streaming or ready to stream") + await asyncio.sleep(0.1) + continue + + # Create the stream if response_stream is None: - # get the streaming object - response_stream = client.stream_frames(every_n=self.stream_every_n) - - response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: - # get the sync frame - frame: oak_pb2.OakSyncFrame = response.frame - - # get image and show - for view_name in ["rgb", "disparity", "left", "right"]: + response_stream = client.stream_frames(every_n=1) + + try: + # try/except so app doesn't crash on killed service + response: oak_pb2.StreamFramesReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + # get the sync frame + frame: oak_pb2.OakSyncFrame = response.frame + + # get image and show + for view_name in ["rgb", "disparity", "left", "right"]: + # Skip if view_name was not included in frame + if hasattr(frame, view_name): self.root.ids[view_name].texture = CoreImage( io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" ).texture - async def send_can_msgs(self, client: CanbusClient) -> None: - """This task ensures the canbus client sendCanbusMessage method has the pose_generator it will use to send - messages on the can bus.""" - while self.root is None: - await asyncio.sleep(0.01) - - while True: - if client.state.value != canbus_pb2.CanbusServiceState.RUNNING: - logging.debug("Controller requires running canbus service") - client.stub.sendCanbusMessage(self.pose_generator()) - await asyncio.sleep(0.25) - async def pose_generator(self, period: float = 0.02): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus at the specified period (recommended 50hz) based on the onscreen joystick position.""" From f244dc17a3a6931f6c85b78c775282d4612db90b Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 16:03:37 -0500 Subject: [PATCH 19/23] try/except for image display --- py/examples/camera_client_gui/main.py | 4 +++- py/examples/virtual_joystick/main.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/py/examples/camera_client_gui/main.py b/py/examples/camera_client_gui/main.py index 0ad4188d..d64821ab 100644 --- a/py/examples/camera_client_gui/main.py +++ b/py/examples/camera_client_gui/main.py @@ -126,10 +126,12 @@ async def stream_camera(self, client: OakCameraClient) -> None: # get image and show for view_name in ["rgb", "disparity", "left", "right"]: # Skip if view_name was not included in frame - if hasattr(frame, view_name): + try: self.root.ids[view_name].texture = CoreImage( io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" ).texture + except Exception as e: + print(e) if __name__ == "__main__": diff --git a/py/examples/virtual_joystick/main.py b/py/examples/virtual_joystick/main.py index 69b98e39..7270ed9c 100644 --- a/py/examples/virtual_joystick/main.py +++ b/py/examples/virtual_joystick/main.py @@ -361,10 +361,13 @@ async def stream_camera(self, client: OakCameraClient) -> None: # get image and show for view_name in ["rgb", "disparity", "left", "right"]: # Skip if view_name was not included in frame - if hasattr(frame, view_name): + try: self.root.ids[view_name].texture = CoreImage( io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" ).texture + except Exception as e: + print(e) + async def pose_generator(self, period: float = 0.02): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus From 2f78671917b32d2af6a1d9e8d116dce69976cf85 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 17:23:16 -0500 Subject: [PATCH 20/23] bi-directional stream for sending CAN msgs used in virtual joystick --- py/examples/virtual_joystick/main.py | 38 +++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/py/examples/virtual_joystick/main.py b/py/examples/virtual_joystick/main.py index 7270ed9c..77194ad6 100644 --- a/py/examples/virtual_joystick/main.py +++ b/py/examples/virtual_joystick/main.py @@ -269,7 +269,7 @@ async def run_wrapper() -> None: # Canbus task(s) self.async_tasks.append(asyncio.ensure_future(self.stream_canbus(canbus_client))) - canbus_client.stub.sendCanbusMessage(self.pose_generator()) + self.async_tasks.append(asyncio.ensure_future(self.send_can_msgs(canbus_client))) # Drawing task(s) self.async_tasks.append(asyncio.ensure_future(self.draw())) @@ -368,6 +368,42 @@ async def stream_camera(self, client: OakCameraClient) -> None: except Exception as e: print(e) + async def send_can_msgs(self, client: CanbusClient) -> None: + """This task ensures the canbus client sendCanbusMessage method has the pose_generator it will use to send + messages on the CAN bus to control the Amiga robot.""" + while self.root is None: + await asyncio.sleep(0.01) + + response_stream = None + while True: + # check the state of the service + state = await client.get_state() + + # Wait for a running CAN bus service + if state.value != service_pb2.ServiceState.RUNNING: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Waiting for running canbus service...") + await asyncio.sleep(0.1) + continue + + if response_stream is None: + print("Start sending CAN messages") + response_stream = client.stub.sendCanbusMessage(self.pose_generator()) + + try: + async for response in response_stream: + # Sit in this loop and wait until canbus service reports back it is not sending + assert response.success + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + await asyncio.sleep(0.1) async def pose_generator(self, period: float = 0.02): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus From c7ad3cffe83bd09300eea17a28ef0b613719b78f Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 17:30:37 -0500 Subject: [PATCH 21/23] Try/except in camera_client for empty buffer --- py/examples/camera_client/main.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/py/examples/camera_client/main.py b/py/examples/camera_client/main.py index 879b906d..08b49ca9 100644 --- a/py/examples/camera_client/main.py +++ b/py/examples/camera_client/main.py @@ -49,19 +49,18 @@ async def main(address: str, port: int, stream_every_n: int) -> None: print(f"Timestamp: {frame.rgb.meta.timestamp}") print("#################################\n") - # Check that rgb frame is in OakSyncFrame response - # NOTE: explore frame.[rgb, disparity, left, right] - if not hasattr(frame, 'rgb'): - continue + try: + # cast image data bytes to numpy and decode + # NOTE: explore frame.[rgb, disparity, left, right] + image = np.frombuffer(frame.rgb.image_data, dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED) - # cast image data bytes to numpy and decode - image = np.frombuffer(frame.rgb.image_data, dtype="uint8") - image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED) - - # visualize the image - cv2.namedWindow("image", cv2.WINDOW_NORMAL) - cv2.imshow("image", image) - cv2.waitKey(1) + # visualize the image + cv2.namedWindow("image", cv2.WINDOW_NORMAL) + cv2.imshow("image", image) + cv2.waitKey(1) + except Exception as e: + print(e) if __name__ == "__main__": From 399d1bdc2ceaec8454d9c6afb195e6830b43472b Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 17:42:40 -0500 Subject: [PATCH 22/23] Define sendCanbusMessage as a bi-directional RPC --- protos/farm_ng/canbus/canbus.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protos/farm_ng/canbus/canbus.proto b/protos/farm_ng/canbus/canbus.proto index f619bb48..df823834 100644 --- a/protos/farm_ng/canbus/canbus.proto +++ b/protos/farm_ng/canbus/canbus.proto @@ -20,7 +20,7 @@ service CanbusService { rpc streamCanbusMessages(StreamCanbusRequest) returns (stream StreamCanbusReply) {} rpc sendCanbusMessage(stream SendCanbusMessageRequest) - returns (SendCanbusMessageReply) {} + returns (stream SendCanbusMessageReply) {} } message StreamCanbusRequest { From 09d4c51ddc595575a2ccbd0e99f455f0fc7a2d5a Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Tue, 17 Jan 2023 19:02:32 -0500 Subject: [PATCH 23/23] Use TurboJPEG for image decoding (faster than kivy) --- py/examples/camera_client_gui/main.py | 15 ++++++++---- .../camera_client_gui/requirements.txt | 1 + py/examples/virtual_joystick/main.py | 24 ++++++++++--------- py/examples/virtual_joystick/requirements.txt | 1 + 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/py/examples/camera_client_gui/main.py b/py/examples/camera_client_gui/main.py index d64821ab..0111c9e4 100644 --- a/py/examples/camera_client_gui/main.py +++ b/py/examples/camera_client_gui/main.py @@ -13,7 +13,6 @@ # limitations under the License. import argparse import asyncio -import io import os from typing import List @@ -22,6 +21,7 @@ from farm_ng.oak.camera_client import OakCameraClient from farm_ng.service import service_pb2 from farm_ng.service.service_client import ClientConfig +from turbojpeg import TurboJPEG os.environ["KIVY_NO_ARGS"] = "1" @@ -32,7 +32,7 @@ from kivy.app import App # noqa: E402 from kivy.lang.builder import Builder # noqa: E402 -from kivy.core.image import Image as CoreImage # noqa: E402 +from kivy.graphics.texture import Texture # noqa: E402 kv = """ TabbedPanel: @@ -63,6 +63,7 @@ def __init__(self, address: str, port: int, stream_every_n: int) -> None: self.port = port self.stream_every_n = stream_every_n + self.image_decoder = TurboJPEG() self.tasks: List[asyncio.Task] = [] def build(self): @@ -127,9 +128,13 @@ async def stream_camera(self, client: OakCameraClient) -> None: for view_name in ["rgb", "disparity", "left", "right"]: # Skip if view_name was not included in frame try: - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + # Decode the image and render it in the correct kivy texture + img = self.image_decoder.decode(getattr(frame, view_name).image_data) + texture = Texture.create(size=(img.shape[1], img.shape[0]), icolorfmt="bgr") + texture.flip_vertical() + texture.blit_buffer(img.tobytes(), colorfmt="bgr", bufferfmt="ubyte", mipmap_generation=False) + self.root.ids[view_name].texture = texture + except Exception as e: print(e) diff --git a/py/examples/camera_client_gui/requirements.txt b/py/examples/camera_client_gui/requirements.txt index 3e5687e2..a974635f 100644 --- a/py/examples/camera_client_gui/requirements.txt +++ b/py/examples/camera_client_gui/requirements.txt @@ -1,2 +1,3 @@ farm-ng-amiga kivy>=2.1.0 +PyTurboJPEG diff --git a/py/examples/virtual_joystick/main.py b/py/examples/virtual_joystick/main.py index 77194ad6..73f5297e 100644 --- a/py/examples/virtual_joystick/main.py +++ b/py/examples/virtual_joystick/main.py @@ -13,7 +13,6 @@ # limitations under the License. import argparse import asyncio -import io import os from math import sqrt from typing import Generator @@ -32,6 +31,7 @@ from farm_ng.oak.camera_client import OakCameraClient from farm_ng.service import service_pb2 from farm_ng.service.service_client import ClientConfig +from turbojpeg import TurboJPEG # Must come before kivy imports @@ -47,12 +47,12 @@ Config.set("kivy", "keyboard_mode", "systemanddock") from kivy.graphics import Color, Ellipse # noqa: E402 +from kivy.graphics.texture import Texture # noqa: E402 from kivy.input.providers.mouse import MouseMotionEvent # noqa: E402 from kivy.properties import StringProperty # noqa: E402 from kivy.app import App # noqa: E402 from kivy.lang.builder import Builder # noqa: E402 from kivy.uix.widget import Widget # noqa: E402 -from kivy.core.image import Image as CoreImage # noqa: E402 from kivy.core.window import Window # noqa: E402 kv = """ @@ -163,9 +163,9 @@ def draw(self) -> None: class VirtualJoystickApp(App): # For kivy labels - amiga_speed = StringProperty() - amiga_rate = StringProperty() - amiga_state = StringProperty() + amiga_speed = StringProperty("???") + amiga_rate = StringProperty("???") + amiga_state = StringProperty("NO CANBUS\nSERVICE DETECTED") def __init__(self, address: str, camera_port: int, canbus_port: int, stream_every_n: int) -> None: super().__init__() @@ -176,14 +176,13 @@ def __init__(self, address: str, camera_port: int, canbus_port: int, stream_ever # Received values self.amiga_tpdo1: AmigaTpdo1 = AmigaTpdo1() - self.amiga_state: str = "NO CANBUS\nSERVICE DETECTED" - self.amiga_speed: str = "???" - self.amiga_rate: str = "???" # Parameters self.max_speed: float = 1.0 self.max_angular_rate: float = 1.0 + self.image_decoder = TurboJPEG() + self.async_tasks: List[asyncio.Task] = [] def build(self): @@ -362,9 +361,12 @@ async def stream_camera(self, client: OakCameraClient) -> None: for view_name in ["rgb", "disparity", "left", "right"]: # Skip if view_name was not included in frame try: - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + # Decode the image and render it in the correct kivy texture + img = self.image_decoder.decode(getattr(frame, view_name).image_data) + texture = Texture.create(size=(img.shape[1], img.shape[0]), icolorfmt="bgr") + texture.flip_vertical() + texture.blit_buffer(img.tobytes(), colorfmt="bgr", bufferfmt="ubyte", mipmap_generation=False) + self.root.ids[view_name].texture = texture except Exception as e: print(e) diff --git a/py/examples/virtual_joystick/requirements.txt b/py/examples/virtual_joystick/requirements.txt index c222a9b7..5f3cfe38 100644 --- a/py/examples/virtual_joystick/requirements.txt +++ b/py/examples/virtual_joystick/requirements.txt @@ -1,2 +1,3 @@ kivy farm-ng-amiga +PyTurboJPEG