diff --git a/README.md b/README.md index 4ae379e..5d5407c 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Visit the [Hailo Official Website](https://hailo.ai/) and [Hailo Community Forum - [Detection Example](#detection-example) - [Pose Estimation Example](#pose-estimation-example) - [Instance Segmentation Example](#instance-segmentation-example) + - [Detection as a service Example](#detection-service-example) - [CLIP Application](#clip-application) - [Frigate Integration - Coming Soon](#frigate-integration---coming-soon) - [Raspberry Pi Official Examples](#raspberry-pi-official-examples) @@ -70,6 +71,10 @@ This application includes support for using retrained detection models. For more ##### [Instance Segmentation Example](doc/basic-pipelines.md#instance-segmentation-example) ![Instance Segmentation Example](doc/images/instance_segmentation.gif) +##### [Detection Service Example](doc/basic-pipelines.md#detection-service-example) +![Detection Service Example](doc/images/detection.gif) + + #### CLIP Application CLIP (Contrastive Language-Image Pretraining) predicts the most relevant text prompt on real-time video frames using the Hailo-8L AI processor. diff --git a/community_projects/UnixDomainSocketIntegration/README.md b/community_projects/UnixDomainSocketIntegration/README.md new file mode 100644 index 0000000..707bede --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/README.md @@ -0,0 +1,46 @@ +# Hailo UnixDomainSocketIntegration example + +This exmaple is based on the detection pipeline. It is an example for converting a pipeline to event-based integration. +The demo model is Object detection, but this can be applied to any model. + +The server measures 'uptime' of each object, and takes only objects with enough 'uptime' (To ignore flicering due bad detection) + +## Installation +Follow the installation flow in the main README file, and then continue following this README file. + +### Enable SPI +```bash +sudo raspi-config +``` + +- 3 Interface Options +- I4 SPI +- Yes +- reboot + + +### Navigate to the repository directory: +```bash +cd hailo-rpi5-examples +``` + +### Environment Configuration (Required for Each New Terminal Session) +Ensure your environment is set up correctly by sourcing the provided script. This script sets the required environment variables and activates the Hailo virtual environment. If the virtual environment does not exist, it will be created automatically. +```bash +source setup_env.sh +``` +### Navigate to the example directory: +```bash +cd community_projects/UnixDomainSocketIntegration/ +``` +### Requirements Installation +Within the activated virtual environment, install the necessary Python packages: +```bash +pip install -r requirements.txt +``` + +### To Run the Simple Example: +```bash +python detection_service.py +``` +- To close the application, press `Ctrl+C`. diff --git a/community_projects/UnixDomainSocketIntegration/detection_service.py b/community_projects/UnixDomainSocketIntegration/detection_service.py new file mode 100644 index 0000000..2dd275e --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/detection_service.py @@ -0,0 +1,102 @@ +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib +import os +import numpy as np +import cv2 +import hailo +import sys + +sys.path.append('../../basic_pipelines') + +from hailo_rpi_common import ( + get_caps_from_pad, + get_numpy_from_buffer, + app_callback_class, +) +from detection_pipeline import GStreamerDetectionApp + +from unix_domain_socket_server import UnixDomainSocketServer + +# Path for the Unix Domain Socket +SOCKET_PATH = "/tmp/gst_detection.sock" + +# ----------------------------------------------------------------------------------------------- +# User-defined class to be used in the callback function +# ----------------------------------------------------------------------------------------------- +# Inheritance from the app_callback_class +class user_app_callback_class(app_callback_class): + def __init__(self): + super().__init__() + self.new_variable = 42 # New variable example + + def new_function(self): # New function example + return "The meaning of life is: " + +# ----------------------------------------------------------------------------------------------- +# User-defined callback function +# ----------------------------------------------------------------------------------------------- + +# This is the callback function that will be called when data is available from the pipeline +def app_callback(pad, info, user_data): + # Get the GstBuffer from the probe info + buffer = info.get_buffer() + # Check if the buffer is valid + if buffer is None: + return Gst.PadProbeReturn.OK + + # Using the user_data to count the number of frames + user_data.increment() + string_to_print = f"Frame count: {user_data.get_count()}\n" + + # Get the caps from the pad + format, width, height = get_caps_from_pad(pad) + + # If the user_data.use_frame is set to True, we can get the video frame from the buffer + frame = None + if user_data.use_frame and format is not None and width is not None and height is not None: + # Get video frame + frame = get_numpy_from_buffer(buffer, format, width, height) + + # Get the detections from the buffer + roi = hailo.get_roi_from_buffer(buffer) + detections = roi.get_objects_typed(hailo.HAILO_DETECTION) + + # Parse the detections + labels = [] + detection_count = 0 + for detection in detections: + label = detection.get_label() + labels.append(label) + bbox = detection.get_bbox() + confidence = detection.get_confidence() + if label == "person": + string_to_print += f"Detection: {label} {confidence:.2f}\n" + detection_count += 1 + if user_data.use_frame: + # Note: using imshow will not work here, as the callback function is not running in the main thread + # Let's print the detection count to the frame + cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + # Example of how to use the new_variable and new_function from the user_data + # Let's print the new_variable and the result of the new_function to the frame + cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + # Convert the frame to BGR + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + user_data.set_frame(frame) + + # Call events server to fire event of a new detection + user_data.socket_server.send_event(labels) + + print(string_to_print) + return Gst.PadProbeReturn.OK + +if __name__ == "__main__": + # Create an instance of the user app callback class + user_data = user_app_callback_class() + + socket_server = UnixDomainSocketServer(SOCKET_PATH) + socket_server.start() + user_data.socket_server = socket_server + + app = GStreamerDetectionApp(app_callback, user_data) + app.run() diff --git a/community_projects/UnixDomainSocketIntegration/requirements.txt b/community_projects/UnixDomainSocketIntegration/requirements.txt new file mode 100644 index 0000000..db48d9c --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/requirements.txt @@ -0,0 +1,5 @@ +numpy<2.0.0 +setproctitle +opencv-python +python-dotenv +DeepDiff \ No newline at end of file diff --git a/community_projects/UnixDomainSocketIntegration/test_tools/event_listener.py b/community_projects/UnixDomainSocketIntegration/test_tools/event_listener.py new file mode 100644 index 0000000..43cd68a --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/test_tools/event_listener.py @@ -0,0 +1,52 @@ +import socket +import os +import time + +# Path to the Unix Domain Socket (ensure it matches the server's socket path) +SOCKET_PATH = "/tmp/gst_detection.sock" + +def main(): + # Create a Unix Domain Socket + client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + try: + # Connect to the server's socket + client_socket.connect(SOCKET_PATH) + print(f"Connected to Unix Domain Socket at {SOCKET_PATH}") + + # Set the socket to non-blocking mode + client_socket.setblocking(False) + + # Listen for messages from the server + while True: + try: + # Try to receive data without blocking + data = client_socket.recv(1024) + if data: + print(f"Received data: {data.decode('utf-8')}") + else: + print("Server disconnected") + break + + except BlockingIOError: + # No data available, continue loop + pass + + # You can add a small sleep to prevent busy looping + time.sleep(0.01) + + except FileNotFoundError: + print(f"Socket path '{SOCKET_PATH}' does not exist. Please ensure the server is running.") + + except ConnectionRefusedError: + print("Could not connect to the server. Please make sure the server is running and accessible.") + + except KeyboardInterrupt: + print("Client shut down by user") + + finally: + # Close the socket + client_socket.close() + +if __name__ == "__main__": + main() diff --git a/community_projects/UnixDomainSocketIntegration/unix_domain_socket_server.py b/community_projects/UnixDomainSocketIntegration/unix_domain_socket_server.py new file mode 100644 index 0000000..c06f4ca --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/unix_domain_socket_server.py @@ -0,0 +1,252 @@ +import os +import socket +import json +import threading +import logging +from datetime import datetime +from deepdiff import DeepDiff +from collections import deque + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# ----------------------------------------------------------------------------------------------- +# Unix Domain Socket Server +# ----------------------------------------------------------------------------------------------- +class UnixDomainSocketServer(threading.Thread): + UPTIME_WINDOW_SIZE = 30 # Number of events to track per object, 10 per second + APPEAR_THRESHOLD = 0.6 + DISAPPEAR_THRESHOLD = 0.3 + + def __init__(self, socket_path): + super().__init__() + self.socket_path = socket_path + self.clients = [] + self.lock = threading.Lock() + self.running = True + self.last_state = {} + self.object_logs = {} # To track detections per object + self.last_sent_visible_objects = set() # To track visible objects + self.current_visible_set = set() # To track visible objects + + # Ensure the socket does not already exist + try: + os.unlink(self.socket_path) + except FileNotFoundError: + pass + + self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.server.bind(self.socket_path) + self.server.listen(5) + self.server.settimeout(1.0) # To allow periodic checking for shutdown + logger.info(f"Unix Domain Socket Server initialized at {socket_path}") + + def run(self): + logger.info("Unix Domain Socket Server started") + while self.running: + try: + client, _ = self.server.accept() + with self.lock: + self.clients.append(client) + logger.info("New client connected.") + except socket.timeout: + continue + except Exception as e: + logger.error(f"Socket accept error: {e}") + break + + self.server.close() + logger.info("Unix Domain Socket Server shut down.") + + def send_event(self, event_payload): + """ + Sends only the differences (diffs) between the new_data and the last sent state. + Implements object uptime for visibility detection. + """ + new_state = self._event_payload_to_state(event_payload) + #differences = self.compute_differences(new_state) + + #if not differences: + # logger.info("No changes detected. No event sent.") + # return + + self.update_object_logs(new_state) + self.determine_visible_objects() + visible_objects_changed = DeepDiff(self.last_sent_visible_objects, self.current_visible_set) != {} + if not visible_objects_changed: + logger.info("No changes in visible objects. No event sent.") + return + + self.send_visible_objects() + + self.update_last_state(new_state) + + def _event_payload_to_state(self, event_payload): + """ + Converts new_data list to dictionary format with full word keys. + """ + return {'objects': [{'id': object_id} for object_id in event_payload]} + + def compute_differences(self, new_state): + """ + Computes the difference between the new_data and last_state. + """ + return DeepDiff(self.last_state, new_state, ignore_order=True) != {} + + def update_object_logs(self, new_state): + """ + Updates object logs based on detected object IDs. + """ + objects = new_state.get('objects', []) + detected_object_ids = set(object['id'] for object in objects) + for object_id in detected_object_ids: + if object_id not in self.object_logs: + self.object_logs[object_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) + self.object_logs[object_id].append(1) # Object detected + + for object_id in list(self.object_logs.keys()): + if object_id not in detected_object_ids: + self.object_logs[object_id].append(0) # Object not detected + + # Remove objects that are not detected for a long time + for object_id, log in self.object_logs.items(): + if len(log) == self.UPTIME_WINDOW_SIZE and sum(log) == 0: + del self.object_logs[object_id] + + return detected_object_ids + + def determine_visible_objects(self): + """ + Determines currently viewable objects based on uptime. + An object becomes visible once uptime_ratio >= APPEAR_THRESHOLD and remains visible until uptime_ratio < DISAPPEAR_THRESHOLD. + """ + current_visible_objects = self.last_sent_visible_objects.copy() + for object_id, log in self.object_logs.items(): + uptime_ratio = sum(log) / len(log) + if object_id in self.last_sent_visible_objects: + if uptime_ratio < self.DISAPPEAR_THRESHOLD: + disappearance_event = {'event': 'object_disappeared', 'object_id': object_id} + self._send_message(disappearance_event) + current_visible_objects.remove(object_id) + else: + if uptime_ratio >= self.APPEAR_THRESHOLD: + appearance_event = {'event': 'object_appeared', 'object_id': object_id} + self._send_message(appearance_event) + current_visible_objects.add(object_id) + + self.current_visible_set = current_visible_objects.copy() + + return list(current_visible_objects) + + def _object_existed(self, object_id): + return object_id in self.last_state.get('objects', [{}])[0] + + def has_visible_objects_changed(self, currently_visible_objects): + """ + Checks if there is a change in visible objects. + """ + if not hasattr(self, 'last_sent_visible_objects'): + self.last_sent_visible_objects = set() + current_visible_set = set(currently_visible_objects) + last_visible_set = self.last_sent_visible_objects + if current_visible_set != last_visible_set: + self.current_visible_set = current_visible_set + return True + return False + + def send_visible_objects(self): + """ + Sends the list of currently visible objects to clients. + """ + message = json.dumps({'visible_objects': list(self.current_visible_set)}, default=make_serializable) + self._send_message(message) + + def update_last_state(self, new_data_dictionary): + """ + Updates the last_state to the new_data after sending diffs. + """ + self.last_state = new_data_dictionary.copy() + self.last_sent_visible_objects = self.current_visible_set.copy() + + + def _send_message(self, message): + message_str = json.dumps(message) + "\n" + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message_str.encode('utf-8')) + logger.info(f"Sent event to client: {message}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending event to client: {e}") + self.clients.remove(client) + +def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except Exception as e: + logger.error(f"Error closing client socket: {e}") + self.clients.clear() + +def make_serializable(obj): + if isinstance(obj, set): + return list(obj) + # Add other custom serialization logic as needed + return str(obj) # Fallback to string representation + +# ----------------------------------------------------------------------------------------------- +# Example Usage +# ----------------------------------------------------------------------------------------------- +if __name__ == "__main__": + import time + + # Path for the Unix Domain Socket + SOCKET_PATH = "/tmp/unix_domain_socket_example.sock" + + # Initialize and start the server + server = UnixDomainSocketServer(SOCKET_PATH) + server.start() + + try: + # Simulate data updates + current_data = { + "timestamp": datetime.now(datetime.timezone.utc).isoformat(), + "value": 100, + "objects": [{"id": "object1"}, {"id": "object2"}] + } + + while True: + # Simulate a change in data + new_value = current_data["value"] + 1 + # Example: object2 becomes object3 + new_data = { + "timestamp": datetime.now(datetime.timezone.utc).isoformat(), + "value": new_value, + "objects": [{"id": "object1"}, {"id": "object3"}] + } + + # Send only the diffs with uptime processing + server.send_event(new_data) + + # Wait for a while before next update + time.sleep(5) + + current_data = new_data + + except KeyboardInterrupt: + logger.info("Interrupt received, shutting down...") + finally: + server.shutdown() + server.join() + # Clean up the socket file + try: + os.unlink(SOCKET_PATH) + except FileNotFoundError: + pass \ No newline at end of file