diff --git a/py/examples/camera_client/main.py b/py/examples/camera_client/main.py index 722d578c..08b49ca9 100644 --- a/py/examples/camera_client/main.py +++ b/py/examples/camera_client/main.py @@ -20,33 +20,28 @@ import numpy as np from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig -from farm_ng.oak.camera_client import OakCameraServiceState +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig async def main(address: str, port: int, stream_every_n: int) -> None: # configure the camera client - config = OakCameraClientConfig(address=address, port=port) + config = ClientConfig(address=address, port=port) client = OakCameraClient(config) - # get the streaming object + # Get the streaming object from the service response_stream = client.stream_frames(every_n=stream_every_n) - # start the streaming service - await client.connect_to_service() - while True: - # query the service state - # NOTE: This could be done asynchronously with client.poll_service_state() - # as in other examples, such as camera_client_gui - state: OakCameraServiceState = await client.get_state() - - if state.value != oak_pb2.OakServiceState.RUNNING: + # check the service state + state = await client.get_state() + if state.value != service_pb2.ServiceState.RUNNING: print("Camera is not streaming!") continue response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: + + if response: # get the sync frame frame: oak_pb2.OakSyncFrame = response.frame print(f"Got frame: {frame.sequence_num}") @@ -54,22 +49,25 @@ async def main(address: str, port: int, stream_every_n: int) -> None: print(f"Timestamp: {frame.rgb.meta.timestamp}") print("#################################\n") - # cast image data bytes to numpy and decode - # NOTE: explore frame.[rgb, disparity, left, right] - image = np.frombuffer(frame.rgb.image_data, dtype="uint8") - image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED) + try: + # cast image data bytes to numpy and decode + # NOTE: explore frame.[rgb, disparity, left, right] + image = np.frombuffer(frame.rgb.image_data, dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED) - # visualize the image - cv2.namedWindow("image", cv2.WINDOW_NORMAL) - cv2.imshow("image", image) - cv2.waitKey(1) + # visualize the image + cv2.namedWindow("image", cv2.WINDOW_NORMAL) + cv2.imshow("image", image) + cv2.waitKey(1) + except Exception as e: + print(e) if __name__ == "__main__": parser = argparse.ArgumentParser(prog="amiga-camera-app") parser.add_argument("--port", type=int, required=True, help="The camera port.") parser.add_argument("--address", type=str, default="localhost", help="The camera address") - parser.add_argument("--stream-every-n", type=int, default=4, help="Streaming frequency") + parser.add_argument("--stream-every-n", type=int, default=1, help="Streaming frequency") args = parser.parse_args() asyncio.run(main(args.address, args.port, args.stream_every_n)) diff --git a/py/examples/camera_client_gui/main.py b/py/examples/camera_client_gui/main.py index 6bb5647c..0111c9e4 100644 --- a/py/examples/camera_client_gui/main.py +++ b/py/examples/camera_client_gui/main.py @@ -13,13 +13,15 @@ # limitations under the License. import argparse import asyncio -import io import os from typing import List +import grpc from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from turbojpeg import TurboJPEG os.environ["KIVY_NO_ARGS"] = "1" @@ -30,7 +32,7 @@ from kivy.app import App # noqa: E402 from kivy.lang.builder import Builder # noqa: E402 -from kivy.core.image import Image as CoreImage # noqa: E402 +from kivy.graphics.texture import Texture # noqa: E402 kv = """ TabbedPanel: @@ -61,11 +63,9 @@ def __init__(self, address: str, port: int, stream_every_n: int) -> None: self.port = port self.stream_every_n = stream_every_n + self.image_decoder = TurboJPEG() self.tasks: List[asyncio.Task] = [] - self.client: OakCameraClient - self.config: OakCameraClientConfig - def build(self): return Builder.load_string(kv) @@ -78,44 +78,65 @@ async def run_wrapper(): task.cancel() # configure the camera client - self.config = OakCameraClientConfig(address=self.address, port=self.port) - self.client = OakCameraClient(self.config) + config = ClientConfig(address=self.address, port=self.port) + client = OakCameraClient(config) # Stream camera frames - self.tasks.append(asyncio.ensure_future(self.stream_camera(self.client))) - # Continuously monitor camera service state - self.tasks.append(asyncio.ensure_future(self.client.poll_service_state())) + self.tasks.append(asyncio.ensure_future(self.stream_camera(client))) return await asyncio.gather(run_wrapper(), *self.tasks) async def stream_camera(self, client: OakCameraClient) -> None: + """This task listens to the camera client's stream and populates the tabbed panel with all 4 image streams + from the oak camera.""" while self.root is None: await asyncio.sleep(0.01) response_stream = None while True: - if client.state.value != oak_pb2.OakServiceState.RUNNING: - # start the streaming service - await client.connect_to_service() - await asyncio.sleep(0.01) - continue - elif response_stream is None: - # get the streaming object - response_stream = client.stream_frames(every_n=self.stream_every_n) - await asyncio.sleep(0.01) + # check the state of the service + state = await client.get_state() + + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Camera service is not streaming or ready to stream") + await asyncio.sleep(0.1) continue - response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: - # get the sync frame - frame: oak_pb2.OakSyncFrame = response.frame + # Create the stream + if response_stream is None: + response_stream = client.stream_frames(every_n=1) + + try: + # try/except so app doesn't crash on killed service + response: oak_pb2.StreamFramesReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue - # get image and show - for view_name in ["rgb", "disparity", "left", "right"]: - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + # get the sync frame + frame: oak_pb2.OakSyncFrame = response.frame + + # get image and show + for view_name in ["rgb", "disparity", "left", "right"]: + # Skip if view_name was not included in frame + try: + # Decode the image and render it in the correct kivy texture + img = self.image_decoder.decode(getattr(frame, view_name).image_data) + texture = Texture.create(size=(img.shape[1], img.shape[0]), icolorfmt="bgr") + texture.flip_vertical() + texture.blit_buffer(img.tobytes(), colorfmt="bgr", bufferfmt="ubyte", mipmap_generation=False) + self.root.ids[view_name].texture = texture + + except Exception as e: + print(e) if __name__ == "__main__": diff --git a/py/examples/camera_client_gui/requirements.txt b/py/examples/camera_client_gui/requirements.txt index 3e5687e2..a974635f 100644 --- a/py/examples/camera_client_gui/requirements.txt +++ b/py/examples/camera_client_gui/requirements.txt @@ -1,2 +1,3 @@ farm-ng-amiga kivy>=2.1.0 +PyTurboJPEG diff --git a/py/examples/virtual_joystick/main.py b/py/examples/virtual_joystick/main.py index ac3f1ed7..73f5297e 100644 --- a/py/examples/virtual_joystick/main.py +++ b/py/examples/virtual_joystick/main.py @@ -13,8 +13,6 @@ # limitations under the License. import argparse import asyncio -import io -import logging import os from math import sqrt from typing import Generator @@ -25,14 +23,16 @@ import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.canbus_client import CanbusClient -from farm_ng.canbus.canbus_client import CanbusClientConfig from farm_ng.canbus.packet import AmigaControlState from farm_ng.canbus.packet import AmigaTpdo1 from farm_ng.canbus.packet import make_amiga_rpdo1_proto from farm_ng.canbus.packet import parse_amiga_tpdo1_proto from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient -from farm_ng.oak.camera_client import OakCameraClientConfig +from farm_ng.service import service_pb2 +from farm_ng.service.service_client import ClientConfig +from turbojpeg import TurboJPEG + # Must come before kivy imports os.environ["KIVY_NO_ARGS"] = "1" @@ -47,12 +47,12 @@ Config.set("kivy", "keyboard_mode", "systemanddock") from kivy.graphics import Color, Ellipse # noqa: E402 +from kivy.graphics.texture import Texture # noqa: E402 from kivy.input.providers.mouse import MouseMotionEvent # noqa: E402 from kivy.properties import StringProperty # noqa: E402 from kivy.app import App # noqa: E402 from kivy.lang.builder import Builder # noqa: E402 from kivy.uix.widget import Widget # noqa: E402 -from kivy.core.image import Image as CoreImage # noqa: E402 from kivy.core.window import Window # noqa: E402 kv = """ @@ -163,9 +163,9 @@ def draw(self) -> None: class VirtualJoystickApp(App): # For kivy labels - amiga_speed = StringProperty() - amiga_rate = StringProperty() - amiga_state = StringProperty() + amiga_speed = StringProperty("???") + amiga_rate = StringProperty("???") + amiga_state = StringProperty("NO CANBUS\nSERVICE DETECTED") def __init__(self, address: str, camera_port: int, canbus_port: int, stream_every_n: int) -> None: super().__init__() @@ -176,14 +176,13 @@ def __init__(self, address: str, camera_port: int, canbus_port: int, stream_ever # Received values self.amiga_tpdo1: AmigaTpdo1 = AmigaTpdo1() - self.amiga_state: str = "NO CANBUS\nSERVICE DETECTED" - self.amiga_speed: str = "???" - self.amiga_rate: str = "???" # Parameters self.max_speed: float = 1.0 self.max_angular_rate: float = 1.0 + self.image_decoder = TurboJPEG() + self.async_tasks: List[asyncio.Task] = [] def build(self): @@ -257,21 +256,19 @@ async def run_wrapper() -> None: task.cancel() # configure the camera client - camera_config: OakCameraClientConfig = OakCameraClientConfig(address=self.address, port=self.camera_port) + camera_config: ClientConfig = ClientConfig(address=self.address, port=self.camera_port) camera_client: OakCameraClient = OakCameraClient(camera_config) # configure the canbus client - canbus_config: CanbusClientConfig = CanbusClientConfig(address=self.address, port=self.canbus_port) + canbus_config: ClientConfig = ClientConfig(address=self.address, port=self.canbus_port) canbus_client: CanbusClient = CanbusClient(canbus_config) # Camera task(s) self.async_tasks.append(asyncio.ensure_future(self.stream_camera(camera_client))) - self.async_tasks.append(asyncio.ensure_future(camera_client.poll_service_state())) # Canbus task(s) self.async_tasks.append(asyncio.ensure_future(self.stream_canbus(canbus_client))) self.async_tasks.append(asyncio.ensure_future(self.send_can_msgs(canbus_client))) - self.async_tasks.append(asyncio.ensure_future(canbus_client.poll_service_state())) # Drawing task(s) self.async_tasks.append(asyncio.ensure_future(self.draw())) @@ -291,21 +288,36 @@ async def stream_canbus(self, client: CanbusClient) -> None: response_stream: Optional[Generator[canbus_pb2.StreamCanbusReply]] = None while True: - while client.state.value != canbus_pb2.CanbusServiceState.RUNNING: - await client.connect_to_service() + # check the state of the service + state = await client.get_state() - if response_stream is None: - response_stream = client.stub.streamCanbusMessages(canbus_pb2.StreamCanbusRequest()) - - response: canbus_pb2.StreamCanbusReply = await response_stream.read() - if response == grpc.aio.EOF: - # Checks for end of stream - break - if response and response.status == canbus_pb2.ReplyStatus.OK: - for proto in response.messages.messages: - amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto) - if amiga_tpdo1: - self.amiga_tpdo1 = amiga_tpdo1 + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + if response_stream is not None: + response_stream.cancel() + response_stream = None + + print("Canbus service is not streaming or ready to stream") + await asyncio.sleep(0.1) + continue + + if response_stream is None and state.value != service_pb2.ServiceState.UNAVAILABLE: + # get the streaming object + response_stream = client.stream() + + try: + # try/except so app doesn't crash on killed service + response: canbus_pb2.StreamCanbusReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + for proto in response.messages.messages: + amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto) + if amiga_tpdo1: + self.amiga_tpdo1 = amiga_tpdo1 async def stream_camera(self, client: OakCameraClient) -> None: """This task listens to the camera client's stream and populates the tabbed panel with all 4 image streams @@ -313,39 +325,87 @@ async def stream_camera(self, client: OakCameraClient) -> None: while self.root is None: await asyncio.sleep(0.01) - response_stream: Optional[Generator[oak_pb2.StreamFramesReply]] = None + response_stream = None while True: - while client.state.value != oak_pb2.OakServiceState.RUNNING: - # start the streaming service - await client.connect_to_service() - + # check the state of the service + state = await client.get_state() + + if state.value not in [service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING]: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Camera service is not streaming or ready to stream") + await asyncio.sleep(0.1) + continue + + # Create the stream if response_stream is None: - # get the streaming object - response_stream = client.stream_frames(every_n=self.stream_every_n) - - response: oak_pb2.StreamFramesReply = await response_stream.read() - if response and response.status == oak_pb2.ReplyStatus.OK: - # get the sync frame - frame: oak_pb2.OakSyncFrame = response.frame - - # get image and show - for view_name in ["rgb", "disparity", "left", "right"]: - self.root.ids[view_name].texture = CoreImage( - io.BytesIO(getattr(frame, view_name).image_data), ext="jpg" - ).texture + response_stream = client.stream_frames(every_n=1) + + try: + # try/except so app doesn't crash on killed service + response: oak_pb2.StreamFramesReply = await response_stream.read() + assert response and response != grpc.aio.EOF, "End of stream" + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + # get the sync frame + frame: oak_pb2.OakSyncFrame = response.frame + + # get image and show + for view_name in ["rgb", "disparity", "left", "right"]: + # Skip if view_name was not included in frame + try: + # Decode the image and render it in the correct kivy texture + img = self.image_decoder.decode(getattr(frame, view_name).image_data) + texture = Texture.create(size=(img.shape[1], img.shape[0]), icolorfmt="bgr") + texture.flip_vertical() + texture.blit_buffer(img.tobytes(), colorfmt="bgr", bufferfmt="ubyte", mipmap_generation=False) + self.root.ids[view_name].texture = texture + except Exception as e: + print(e) async def send_can_msgs(self, client: CanbusClient) -> None: """This task ensures the canbus client sendCanbusMessage method has the pose_generator it will use to send - messages on the can bus.""" + messages on the CAN bus to control the Amiga robot.""" while self.root is None: await asyncio.sleep(0.01) + response_stream = None while True: - if client.state.value != canbus_pb2.CanbusServiceState.RUNNING: - logging.debug("Controller requires running canbus service") - client.stub.sendCanbusMessage(self.pose_generator()) - await asyncio.sleep(0.25) + # check the state of the service + state = await client.get_state() + + # Wait for a running CAN bus service + if state.value != service_pb2.ServiceState.RUNNING: + # Cancel existing stream, if it exists + if response_stream is not None: + response_stream.cancel() + response_stream = None + print("Waiting for running canbus service...") + await asyncio.sleep(0.1) + continue + + if response_stream is None: + print("Start sending CAN messages") + response_stream = client.stub.sendCanbusMessage(self.pose_generator()) + + try: + async for response in response_stream: + # Sit in this loop and wait until canbus service reports back it is not sending + assert response.success + except Exception as e: + print(e) + response_stream.cancel() + response_stream = None + continue + + await asyncio.sleep(0.1) async def pose_generator(self, period: float = 0.02): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus diff --git a/py/examples/virtual_joystick/requirements.txt b/py/examples/virtual_joystick/requirements.txt index c222a9b7..5f3cfe38 100644 --- a/py/examples/virtual_joystick/requirements.txt +++ b/py/examples/virtual_joystick/requirements.txt @@ -1,2 +1,3 @@ kivy farm-ng-amiga +PyTurboJPEG