From 0955051bdcfc222b3f886aec3de89a43b25fc347 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Thu, 3 Oct 2024 02:32:56 +0300 Subject: [PATCH 01/27] feat(haas): hailo as a service setup, unix domain event based --- basic_pipelines/detection_service.py | 319 +++++++++++++++++++++++++++ requirements.txt | 1 + 2 files changed, 320 insertions(+) create mode 100644 basic_pipelines/detection_service.py diff --git a/basic_pipelines/detection_service.py b/basic_pipelines/detection_service.py new file mode 100644 index 0000000..601c084 --- /dev/null +++ b/basic_pipelines/detection_service.py @@ -0,0 +1,319 @@ +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib +import os +import json +import argparse +import multiprocessing +import numpy as np +import setproctitle +import cv2 +import socket +import time +import hailo +import logging # Import the logging module +from datetime import datetime + +from hailo_rpi_common import ( + get_default_parser, + QUEUE, + get_caps_from_pad, + get_numpy_from_buffer, + GStreamerApp, + app_callback_class, +) + +import threading + +# Path for the Unix Domain Socket +SOCKET_PATH = "/tmp/gst_detection.sock" + +# ----------------------------------------------------------------------------------------------- +# Logger Setup +# ----------------------------------------------------------------------------------------------- +log_filename = f"app_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" +logging.basicConfig( + filename=log_filename, + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------------------------- +# User-defined class to be used in the callback function +# ----------------------------------------------------------------------------------------------- +# Inheritance from the app_callback_class +class user_app_callback_class(app_callback_class): + def __init__(self): + super().__init__() + self.new_variable = 42 # New variable example + + def new_function(self): # New function example + return "The meaning of life is: " + +# ----------------------------------------------------------------------------------------------- +# Unix Domain Socket Server +# ----------------------------------------------------------------------------------------------- +class UnixDomainSocketServer(threading.Thread): + def __init__(self, socket_path): + super().__init__() + self.socket_path = socket_path + self.clients = [] + self.lock = threading.Lock() + self.running = True + + # Ensure the socket does not already exist + try: + os.unlink(self.socket_path) + except FileNotFoundError: + pass + + self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.server.bind(self.socket_path) + self.server.listen(5) + self.server.settimeout(1.0) # To allow periodic checking for shutdown + logger.info(f"Unix Domain Socket Server initialized at {socket_path}") + + def run(self): + logger.info("Unix Domain Socket Server started") + while self.running: + try: + client, _ = self.server.accept() + with self.lock: + self.clients.append(client) + logger.info("New client connected.") + except socket.timeout: + continue + except Exception as e: + logger.error(f"Socket accept error: {e}") + break + + self.server.close() + logger.info("Unix Domain Socket Server shut down.") + + def send_event(self, data): + message = json.dumps(data) + "\n" + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message.encode('utf-8')) + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending data to client: {e}") + self.clients.remove(client) + + def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except: + pass + self.clients.clear() + + +# ----------------------------------------------------------------------------------------------- +# User-defined callback function +# ----------------------------------------------------------------------------------------------- + +# This is the callback function that will be called when data is available from the pipeline +def app_callback(pad, info, user_data): + # Get the GstBuffer from the probe info + buffer = info.get_buffer() + # Check if the buffer is valid + if buffer is None: + return Gst.PadProbeReturn.OK + + # Using the user_data to count the number of frames + user_data.increment() + string_to_print = f"Frame count: {user_data.get_count()}\n" + + # Get the caps from the pad + format, width, height = get_caps_from_pad(pad) + + # If the user_data.use_frame is set to True, we can get the video frame from the buffer + frame = None + if user_data.use_frame and format is not None and width is not None and height is not None: + # Get video frame + frame = get_numpy_from_buffer(buffer, format, width, height) + + # Get the detections from the buffer + roi = hailo.get_roi_from_buffer(buffer) + detections = roi.get_objects_typed(hailo.HAILO_DETECTION) + + # Parse the detections + detection_count = 0 + for detection in detections: + label = detection.get_label() + bbox = detection.get_bbox() + confidence = detection.get_confidence() + if label == "person": + string_to_print += f"Detection: {label} {confidence:.2f}\n" + detection_count += 1 + if user_data.use_frame: + # Note: using imshow will not work here, as the callback function is not running in the main thread + # Let's print the detection count to the frame + cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + # Example of how to use the new_variable and new_function from the user_data + # Let's print the new_variable and the result of the new_function to the frame + cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + # Convert the frame to BGR + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + user_data.set_frame(frame) + + labels = [detection.get_label() for detection in detections] + + user_data.socket_server.send_event(labels) + + + print(string_to_print) + return Gst.PadProbeReturn.OK + + +# ----------------------------------------------------------------------------------------------- +# User Gstreamer Application +# ----------------------------------------------------------------------------------------------- + +# This class inherits from the hailo_rpi_common.GStreamerApp class +class GStreamerDetectionApp(GStreamerApp): + def __init__(self, args, user_data): + # Call the parent class constructor + super().__init__(args, user_data) + # Additional initialization code can be added here + # Set Hailo parameters these parameters should be set based on the model used + self.batch_size = 2 + self.network_width = 640 + self.network_height = 640 + self.network_format = "RGB" + nms_score_threshold = 0.3 + nms_iou_threshold = 0.45 + + # Temporary code: new postprocess will be merged to TAPPAS. + # Check if new postprocess so file exists + new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so') + if os.path.exists(new_postprocess_path): + self.default_postprocess_so = new_postprocess_path + else: + self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so') + + if args.hef_path is not None: + self.hef_path = args.hef_path + # Set the HEF file path based on the network + elif args.network == "yolov6n": + self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef') + elif args.network == "yolov8s": + self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef') + elif args.network == "yolox_s_leaky": + self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef') + else: + assert False, "Invalid network type" + + # User-defined label JSON file + if args.labels_json is not None: + self.labels_config = f' config-path={args.labels_json} ' + else: + self.labels_config = '' + + self.app_callback = app_callback + + self.thresholds_str = ( + f"nms-score-threshold={nms_score_threshold} " + f"nms-iou-threshold={nms_iou_threshold} " + f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32" + ) + + # Set the process title + setproctitle.setproctitle("Hailo Detection App") + + self.create_pipeline() + + def get_pipeline_string(self): + if self.source_type == "rpi": + source_element = ( + "libcamerasrc name=src_0 ! " + f"video/x-raw, format={self.network_format}, width=1536, height=864 ! " + + QUEUE("queue_src_scale") + + "videoscale ! " + f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! " + ) + elif self.source_type == "usb": + source_element = ( + f"v4l2src device={self.video_source} name=src_0 ! " + "video/x-raw, width=640, height=480, framerate=30/1 ! " + ) + else: + source_element = ( + f"filesrc location=\"{self.video_source}\" name=src_0 ! " + + QUEUE("queue_dec264") + + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! " + " video/x-raw, format=I420 ! " + ) + source_element += QUEUE("queue_scale") + source_element += "videoscale n-threads=2 ! " + source_element += QUEUE("queue_src_convert") + source_element += "videoconvert n-threads=3 name=src_convert qos=false ! " + source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! " + + pipeline_string = ( + "hailomuxer name=hmux " + + source_element + + "tee name=t ! " + + QUEUE("bypass_queue", max_size_buffers=20) + + "hmux.sink_0 " + + "t. ! " + + QUEUE("queue_hailonet") + + "videoconvert n-threads=3 ! " + f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! " + + QUEUE("queue_hailofilter") + + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! " + + QUEUE("queue_hmuc") + + "hmux.sink_1 " + + "hmux. ! " + + QUEUE("queue_hailo_python") + + QUEUE("queue_user_callback") + + "identity name=identity_callback ! " + + QUEUE("queue_hailooverlay") + + "hailooverlay ! " + + QUEUE("queue_videoconvert") + + "videoconvert n-threads=3 qos=false ! " + + QUEUE("queue_hailo_display") + + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true " + ) + print(pipeline_string) + return pipeline_string + +if __name__ == "__main__": + # Create an instance of the user app callback class + user_data = user_app_callback_class() + + socket_server = UnixDomainSocketServer(SOCKET_PATH) + socket_server.start() + user_data.socket_server = socket_server + + + parser = get_default_parser() + # Add additional arguments here + parser.add_argument( + "--network", + default="yolov6n", + choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'], + help="Which Network to use, default is yolov6n", + ) + parser.add_argument( + "--hef-path", + default=None, + help="Path to HEF file", + ) + parser.add_argument( + "--labels-json", + default=None, + help="Path to costume labels JSON file", + ) + args = parser.parse_args() + app = GStreamerDetectionApp(args, user_data) + app.run() diff --git a/requirements.txt b/requirements.txt index a683e20..06ae3c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ numpy<2.0.0 setproctitle opencv-python +python-dotenv From 6a8f9d8ad4f81ca6291ca3a07d6ef49f8e0288b8 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Thu, 3 Oct 2024 08:39:33 +0300 Subject: [PATCH 02/27] fix(haas): adding testing tool for event mode" --- test_tools/event_listener.py | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100755 test_tools/event_listener.py diff --git a/test_tools/event_listener.py b/test_tools/event_listener.py new file mode 100755 index 0000000..df95ce9 --- /dev/null +++ b/test_tools/event_listener.py @@ -0,0 +1,40 @@ +import socket +import os + +# Path to the Unix Domain Socket (ensure it matches the server's socket path) +SOCKET_PATH = "/tmp/gst_detection.sock" + +def main(): + # Create a Unix Domain Socket + client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + try: + # Connect to the server's socket + client_socket.connect(SOCKET_PATH) + print(f"Connected to Unix Domain Socket at {SOCKET_PATH}") + + # Listen for messages from the server + while True: + data = client_socket.recv(1024) + if not data: + print("Server disconnected") + break + + # Print received data + print(f"Received data: {data.decode('utf-8')}") + + except FileNotFoundError: + print(f"Socket path '{SOCKET_PATH}' does not exist. Please ensure the server is running.") + + except ConnectionRefusedError: + print("Could not connect to the server. Please make sure the server is running and accessible.") + + except KeyboardInterrupt: + print("Client shut down by user") + + finally: + # Close the socket + client_socket.close() + +if __name__ == "__main__": + main() From e5988ea6dc6b98bb4788fdde060e55ebc99cd3c9 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Fri, 4 Oct 2024 02:15:41 +0300 Subject: [PATCH 03/27] feat(haas): non-blocking loop for event communication test example --- test_tools/event_listener.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/test_tools/event_listener.py b/test_tools/event_listener.py index df95ce9..43cd68a 100755 --- a/test_tools/event_listener.py +++ b/test_tools/event_listener.py @@ -1,5 +1,6 @@ import socket import os +import time # Path to the Unix Domain Socket (ensure it matches the server's socket path) SOCKET_PATH = "/tmp/gst_detection.sock" @@ -13,16 +14,27 @@ def main(): client_socket.connect(SOCKET_PATH) print(f"Connected to Unix Domain Socket at {SOCKET_PATH}") + # Set the socket to non-blocking mode + client_socket.setblocking(False) + # Listen for messages from the server while True: - data = client_socket.recv(1024) - if not data: - print("Server disconnected") - break + try: + # Try to receive data without blocking + data = client_socket.recv(1024) + if data: + print(f"Received data: {data.decode('utf-8')}") + else: + print("Server disconnected") + break + + except BlockingIOError: + # No data available, continue loop + pass + + # You can add a small sleep to prevent busy looping + time.sleep(0.01) - # Print received data - print(f"Received data: {data.decode('utf-8')}") - except FileNotFoundError: print(f"Socket path '{SOCKET_PATH}' does not exist. Please ensure the server is running.") From 21e120de808de5bc47050efa3b3e648f05d5e2e3 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Fri, 4 Oct 2024 02:29:29 +0300 Subject: [PATCH 04/27] chore(haas): move unix domain events server to a module in a different file" --- basic_pipelines/detection_service.py | 72 ++----------------- basic_pipelines/unix_domain_socket_server.py | 73 ++++++++++++++++++++ 2 files changed, 78 insertions(+), 67 deletions(-) create mode 100644 basic_pipelines/unix_domain_socket_server.py diff --git a/basic_pipelines/detection_service.py b/basic_pipelines/detection_service.py index 601c084..5538dc8 100644 --- a/basic_pipelines/detection_service.py +++ b/basic_pipelines/detection_service.py @@ -11,7 +11,7 @@ import socket import time import hailo -import logging # Import the logging module +import logging from datetime import datetime from hailo_rpi_common import ( @@ -23,6 +23,8 @@ app_callback_class, ) +from unix_domain_socket_server import UnixDomainSocketServer + import threading # Path for the Unix Domain Socket @@ -52,71 +54,6 @@ def __init__(self): def new_function(self): # New function example return "The meaning of life is: " -# ----------------------------------------------------------------------------------------------- -# Unix Domain Socket Server -# ----------------------------------------------------------------------------------------------- -class UnixDomainSocketServer(threading.Thread): - def __init__(self, socket_path): - super().__init__() - self.socket_path = socket_path - self.clients = [] - self.lock = threading.Lock() - self.running = True - - # Ensure the socket does not already exist - try: - os.unlink(self.socket_path) - except FileNotFoundError: - pass - - self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.server.bind(self.socket_path) - self.server.listen(5) - self.server.settimeout(1.0) # To allow periodic checking for shutdown - logger.info(f"Unix Domain Socket Server initialized at {socket_path}") - - def run(self): - logger.info("Unix Domain Socket Server started") - while self.running: - try: - client, _ = self.server.accept() - with self.lock: - self.clients.append(client) - logger.info("New client connected.") - except socket.timeout: - continue - except Exception as e: - logger.error(f"Socket accept error: {e}") - break - - self.server.close() - logger.info("Unix Domain Socket Server shut down.") - - def send_event(self, data): - message = json.dumps(data) + "\n" - with self.lock: - for client in self.clients[:]: - try: - client.sendall(message.encode('utf-8')) - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client) - except Exception as e: - logger.error(f"Error sending data to client: {e}") - self.clients.remove(client) - - def shutdown(self): - logger.info("Shutting down Unix Domain Socket Server") - self.running = False - with self.lock: - for client in self.clients: - try: - client.close() - except: - pass - self.clients.clear() - - # ----------------------------------------------------------------------------------------------- # User-defined callback function # ----------------------------------------------------------------------------------------------- @@ -168,9 +105,9 @@ def app_callback(pad, info, user_data): labels = [detection.get_label() for detection in detections] + # Call events server to fire event of a new detection user_data.socket_server.send_event(labels) - print(string_to_print) return Gst.PadProbeReturn.OK @@ -291,6 +228,7 @@ def get_pipeline_string(self): # Create an instance of the user app callback class user_data = user_app_callback_class() + # Set up events server socket_server = UnixDomainSocketServer(SOCKET_PATH) socket_server.start() user_data.socket_server = socket_server diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py new file mode 100644 index 0000000..1fee222 --- /dev/null +++ b/basic_pipelines/unix_domain_socket_server.py @@ -0,0 +1,73 @@ +import os +import socket +import json +import threading +import logging + +from datetime import datetime + + +# ----------------------------------------------------------------------------------------------- +# Unix Domain Socket Server +# ----------------------------------------------------------------------------------------------- +class UnixDomainSocketServer(threading.Thread): + def __init__(self, socket_path): + super().__init__() + self.socket_path = socket_path + self.clients = [] + self.lock = threading.Lock() + self.running = True + + # Ensure the socket does not already exist + try: + os.unlink(self.socket_path) + except FileNotFoundError: + pass + + self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.server.bind(self.socket_path) + self.server.listen(5) + self.server.settimeout(1.0) # To allow periodic checking for shutdown + logger.info(f"Unix Domain Socket Server initialized at {socket_path}") + + def run(self): + logger.info("Unix Domain Socket Server started") + while self.running: + try: + client, _ = self.server.accept() + with self.lock: + self.clients.append(client) + logger.info("New client connected.") + except socket.timeout: + continue + except Exception as e: + logger.error(f"Socket accept error: {e}") + break + + self.server.close() + logger.info("Unix Domain Socket Server shut down.") + + def send_event(self, data): + message = json.dumps(data) + "\n" + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message.encode('utf-8')) + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending data to client: {e}") + self.clients.remove(client) + + def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except: + pass + self.clients.clear() + From c720ebe006e7bb9f47ca81768c80453a79a778b1 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Fri, 4 Oct 2024 02:41:38 +0300 Subject: [PATCH 05/27] chore(haas): update documentation with new events module --- README.md | 5 +++++ doc/basic-pipelines.md | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/README.md b/README.md index 45d12d3..7173405 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Visit the [Hailo Official Website](https://hailo.ai/) and [Hailo Community Forum - [Detection Example](#detection-example) - [Pose Estimation Example](#pose-estimation-example) - [Instance Segmentation Example](#instance-segmentation-example) + - [Detection as a service Example](#detection-service-example) - [CLIP Application](#clip-application) - [Frigate Integration - Coming Soon](#frigate-integration---coming-soon) - [Raspberry Pi Official Examples](#raspberry-pi-official-examples) @@ -68,6 +69,10 @@ This application includes support for using retrained detection models. For more ##### [Instance Segmentation Example](doc/basic-pipelines.md#instance-segmentation-example) ![Instance Segmentation Example](doc/images/instance_segmentation.gif) +##### [Detection Service Example](doc/basic-pipelines.md#detection-service-example) +![Detection Service Example](doc/images/detection.gif) + + #### CLIP Application CLIP (Contrastive Language-Image Pretraining) predicts the most relevant text prompt on real-time video frames using the Hailo-8L AI processor. diff --git a/doc/basic-pipelines.md b/doc/basic-pipelines.md index cb4ce20..7dd85ff 100644 --- a/doc/basic-pipelines.md +++ b/doc/basic-pipelines.md @@ -206,6 +206,16 @@ Here is an example output of the detection pipeline graph: ![detection_pipeline](images/detection_pipeline.png) Tip: Right click on the image and select "Open image in new tab" to see the full image. +##### [Detection Example](doc/basic-pipelines.md#detection-example) +![Detection Example](doc/images/detection.gif) + +# Detection-service Example +![Banner](images/detection_Servier.gif) + +This mode allows integration with minimal changes to your app. +You can test out the integration, as well as see a demo how to integrate it in your app by looking at **test_tools**. +The test tool allows for integrating with either holding for next input, or continuing. + # Troubleshoting and Known Issues If you encounter any issues, please open a ticket in the [Hailo Community Forum](https://community.hailo.ai/). It is full with useful information and might already include the solution to your problem. From 8a8ce02554b1e1318813c3dd3da2ef5ed287cb01 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 5 Oct 2024 23:18:50 +0300 Subject: [PATCH 06/27] feat(haas): event on changes only --- basic_pipelines/unix_domain_socket_server.py | 80 ++++++++++++++++++-- requirements.txt | 1 + 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 1fee222..8613113 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -3,9 +3,12 @@ import json import threading import logging - from datetime import datetime +from deepdiff import DeepDiff # You need to install this package +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------------------------- # Unix Domain Socket Server @@ -17,6 +20,7 @@ def __init__(self, socket_path): self.clients = [] self.lock = threading.Lock() self.running = True + self.last_state = {} # Initialize last_state to keep track of previous data # Ensure the socket does not already exist try: @@ -47,12 +51,24 @@ def run(self): self.server.close() logger.info("Unix Domain Socket Server shut down.") - def send_event(self, data): - message = json.dumps(data) + "\n" + def send_event(self, new_data): + """ + Sends only the differences (diffs) between the new_data and the last sent state. + """ + # Compute the difference between new_data and last_state + diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() + + if not diff: + logger.info("No changes detected. No event sent.") + return # No changes to send + + message = json.dumps(diff, default=make_serializable) + "\n" + with self.lock: for client in self.clients[:]: try: client.sendall(message.encode('utf-8')) + logger.info(f"Sent diff to client: {diff}") except BrokenPipeError: logger.warning("Client disconnected.") self.clients.remove(client) @@ -60,6 +76,9 @@ def send_event(self, data): logger.error(f"Error sending data to client: {e}") self.clients.remove(client) + # Update the last_state to the new_data after sending diffs + self.last_state = new_data.copy() + def shutdown(self): logger.info("Shutting down Unix Domain Socket Server") self.running = False @@ -67,7 +86,58 @@ def shutdown(self): for client in self.clients: try: client.close() - except: - pass + except Exception as e: + logger.error(f"Error closing client socket: {e}") self.clients.clear() +def make_serializable(obj): + if isinstance(obj, set): + return list(obj) + # Add other custom serialization logic as needed + return str(obj) # Fallback to string representation + +# ----------------------------------------------------------------------------------------------- +# Example Usage +# ----------------------------------------------------------------------------------------------- +if __name__ == "__main__": + import time + + # Path for the Unix Domain Socket + SOCKET_PATH = "/tmp/unix_domain_socket_example.sock" + # Initialize and start the server + server = UnixDomainSocketServer(SOCKET_PATH) + server.start() + + try: + # Simulate data updates + current_data = { + "timestamp": datetime.utcnow().isoformat(), + "value": 100 + } + + while True: + # Simulate a change in data + new_value = current_data["value"] + 1 + new_data = { + "timestamp": datetime.utcnow().isoformat(), + "value": new_value + } + + # Send only the diffs + server.send_event(new_data) + + # Wait for a while before next update + time.sleep(5) + + current_data = new_data + + except KeyboardInterrupt: + logger.info("Interrupt received, shutting down...") + finally: + server.shutdown() + server.join() + # Clean up the socket file + try: + os.unlink(SOCKET_PATH) + except FileNotFoundError: + pass diff --git a/requirements.txt b/requirements.txt index 06ae3c2..d213671 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy<2.0.0 setproctitle opencv-python python-dotenv +DeepDiff From 72e6cfa37629b40658ab13c29f267ffb1b2e6151 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 19 Oct 2024 23:18:00 +0300 Subject: [PATCH 07/27] chore(haas): update example per new template --- basic_pipelines/detection_service.py | 168 +-------------------------- 1 file changed, 2 insertions(+), 166 deletions(-) diff --git a/basic_pipelines/detection_service.py b/basic_pipelines/detection_service.py index 5538dc8..cbc1e35 100644 --- a/basic_pipelines/detection_service.py +++ b/basic_pipelines/detection_service.py @@ -2,46 +2,18 @@ gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib import os -import json -import argparse -import multiprocessing import numpy as np -import setproctitle import cv2 -import socket -import time import hailo -import logging -from datetime import datetime - from hailo_rpi_common import ( - get_default_parser, - QUEUE, get_caps_from_pad, get_numpy_from_buffer, - GStreamerApp, app_callback_class, ) +from detection_pipeline import GStreamerDetectionApp from unix_domain_socket_server import UnixDomainSocketServer -import threading - -# Path for the Unix Domain Socket -SOCKET_PATH = "/tmp/gst_detection.sock" - -# ----------------------------------------------------------------------------------------------- -# Logger Setup -# ----------------------------------------------------------------------------------------------- -log_filename = f"app_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" -logging.basicConfig( - filename=log_filename, - level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', -) -logger = logging.getLogger(__name__) - - # ----------------------------------------------------------------------------------------------- # User-defined class to be used in the callback function # ----------------------------------------------------------------------------------------------- @@ -103,155 +75,19 @@ def app_callback(pad, info, user_data): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) user_data.set_frame(frame) - labels = [detection.get_label() for detection in detections] - # Call events server to fire event of a new detection user_data.socket_server.send_event(labels) print(string_to_print) return Gst.PadProbeReturn.OK - -# ----------------------------------------------------------------------------------------------- -# User Gstreamer Application -# ----------------------------------------------------------------------------------------------- - -# This class inherits from the hailo_rpi_common.GStreamerApp class -class GStreamerDetectionApp(GStreamerApp): - def __init__(self, args, user_data): - # Call the parent class constructor - super().__init__(args, user_data) - # Additional initialization code can be added here - # Set Hailo parameters these parameters should be set based on the model used - self.batch_size = 2 - self.network_width = 640 - self.network_height = 640 - self.network_format = "RGB" - nms_score_threshold = 0.3 - nms_iou_threshold = 0.45 - - # Temporary code: new postprocess will be merged to TAPPAS. - # Check if new postprocess so file exists - new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so') - if os.path.exists(new_postprocess_path): - self.default_postprocess_so = new_postprocess_path - else: - self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so') - - if args.hef_path is not None: - self.hef_path = args.hef_path - # Set the HEF file path based on the network - elif args.network == "yolov6n": - self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef') - elif args.network == "yolov8s": - self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef') - elif args.network == "yolox_s_leaky": - self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef') - else: - assert False, "Invalid network type" - - # User-defined label JSON file - if args.labels_json is not None: - self.labels_config = f' config-path={args.labels_json} ' - else: - self.labels_config = '' - - self.app_callback = app_callback - - self.thresholds_str = ( - f"nms-score-threshold={nms_score_threshold} " - f"nms-iou-threshold={nms_iou_threshold} " - f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32" - ) - - # Set the process title - setproctitle.setproctitle("Hailo Detection App") - - self.create_pipeline() - - def get_pipeline_string(self): - if self.source_type == "rpi": - source_element = ( - "libcamerasrc name=src_0 ! " - f"video/x-raw, format={self.network_format}, width=1536, height=864 ! " - + QUEUE("queue_src_scale") - + "videoscale ! " - f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! " - ) - elif self.source_type == "usb": - source_element = ( - f"v4l2src device={self.video_source} name=src_0 ! " - "video/x-raw, width=640, height=480, framerate=30/1 ! " - ) - else: - source_element = ( - f"filesrc location=\"{self.video_source}\" name=src_0 ! " - + QUEUE("queue_dec264") - + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! " - " video/x-raw, format=I420 ! " - ) - source_element += QUEUE("queue_scale") - source_element += "videoscale n-threads=2 ! " - source_element += QUEUE("queue_src_convert") - source_element += "videoconvert n-threads=3 name=src_convert qos=false ! " - source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! " - - pipeline_string = ( - "hailomuxer name=hmux " - + source_element - + "tee name=t ! " - + QUEUE("bypass_queue", max_size_buffers=20) - + "hmux.sink_0 " - + "t. ! " - + QUEUE("queue_hailonet") - + "videoconvert n-threads=3 ! " - f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! " - + QUEUE("queue_hailofilter") - + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! " - + QUEUE("queue_hmuc") - + "hmux.sink_1 " - + "hmux. ! " - + QUEUE("queue_hailo_python") - + QUEUE("queue_user_callback") - + "identity name=identity_callback ! " - + QUEUE("queue_hailooverlay") - + "hailooverlay ! " - + QUEUE("queue_videoconvert") - + "videoconvert n-threads=3 qos=false ! " - + QUEUE("queue_hailo_display") - + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true " - ) - print(pipeline_string) - return pipeline_string - if __name__ == "__main__": # Create an instance of the user app callback class user_data = user_app_callback_class() - # Set up events server socket_server = UnixDomainSocketServer(SOCKET_PATH) socket_server.start() user_data.socket_server = socket_server - - parser = get_default_parser() - # Add additional arguments here - parser.add_argument( - "--network", - default="yolov6n", - choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'], - help="Which Network to use, default is yolov6n", - ) - parser.add_argument( - "--hef-path", - default=None, - help="Path to HEF file", - ) - parser.add_argument( - "--labels-json", - default=None, - help="Path to costume labels JSON file", - ) - args = parser.parse_args() - app = GStreamerDetectionApp(args, user_data) + app = GStreamerDetectionApp(app_callback, user_data) app.run() From 2a41b5480dc3d62315a2749a877919b818cfd5b7 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 19 Oct 2024 23:38:02 +0300 Subject: [PATCH 08/27] fix(haas): add missing event location --- basic_pipelines/detection_service.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/basic_pipelines/detection_service.py b/basic_pipelines/detection_service.py index cbc1e35..55d3f72 100644 --- a/basic_pipelines/detection_service.py +++ b/basic_pipelines/detection_service.py @@ -14,6 +14,9 @@ from unix_domain_socket_server import UnixDomainSocketServer +# Path for the Unix Domain Socket +SOCKET_PATH = "/tmp/gst_detection.sock" + # ----------------------------------------------------------------------------------------------- # User-defined class to be used in the callback function # ----------------------------------------------------------------------------------------------- From 6afe7a0432509ccf1809ef99d5f3f45181556f9f Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Mon, 28 Oct 2024 01:32:51 +0200 Subject: [PATCH 09/27] fix(haas): collect labels for event --- basic_pipelines/detection_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/basic_pipelines/detection_service.py b/basic_pipelines/detection_service.py index 55d3f72..6e4603d 100644 --- a/basic_pipelines/detection_service.py +++ b/basic_pipelines/detection_service.py @@ -59,9 +59,11 @@ def app_callback(pad, info, user_data): detections = roi.get_objects_typed(hailo.HAILO_DETECTION) # Parse the detections + labels = [] detection_count = 0 for detection in detections: label = detection.get_label() + labels.append(label) bbox = detection.get_bbox() confidence = detection.get_confidence() if label == "person": From c02857fb995ec69234b5dfa9ec5410a541b7d130 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Fri, 29 Nov 2024 23:10:21 +0200 Subject: [PATCH 10/27] feat(unix_domain_socket_server): implement object visibility detection with uptime tracking --- basic_pipelines/unix_domain_socket_server.py | 92 +++++++++++++++----- 1 file changed, 71 insertions(+), 21 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 8613113..c34ab31 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -4,7 +4,8 @@ import threading import logging from datetime import datetime -from deepdiff import DeepDiff # You need to install this package +from deepdiff import DeepDiff +from collections import deque # Configure logging logging.basicConfig(level=logging.INFO) @@ -14,13 +15,18 @@ # Unix Domain Socket Server # ----------------------------------------------------------------------------------------------- class UnixDomainSocketServer(threading.Thread): + UPTIME_WINDOW_SIZE = 100 # Number of events to track per object + APPEAR_THRESHOLD = 0.6 + DISAPPEAR_THRESHOLD = 0.3 + def __init__(self, socket_path): super().__init__() self.socket_path = socket_path self.clients = [] self.lock = threading.Lock() self.running = True - self.last_state = {} # Initialize last_state to keep track of previous data + self.last_state = {} + self.object_logs = {} # To track detections per object # Ensure the socket does not already exist try: @@ -54,6 +60,7 @@ def run(self): def send_event(self, new_data): """ Sends only the differences (diffs) between the new_data and the last sent state. + Implements object uptime for visibility detection. """ # Compute the difference between new_data and last_state diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() @@ -62,19 +69,58 @@ def send_event(self, new_data): logger.info("No changes detected. No event sent.") return # No changes to send - message = json.dumps(diff, default=make_serializable) + "\n" - - with self.lock: - for client in self.clients[:]: - try: - client.sendall(message.encode('utf-8')) - logger.info(f"Sent diff to client: {diff}") - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client) - except Exception as e: - logger.error(f"Error sending data to client: {e}") - self.clients.remove(client) + # Update object logs + detected_objects = {obj['id'] for obj in new_data.get('objects', [])} + for obj_id in detected_objects: + if obj_id not in self.object_logs: + self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) + self.object_logs[obj_id].append(1) # Detected + + # Update logs for objects not detected in this event + for obj_id, log in self.object_logs.items(): + if obj_id not in detected_objects: + log.append(0) # Not detected + + # Determine currently viewable objects based on uptime + visible_objects = [] + for obj_id, log in self.object_logs.items(): + uptime = sum(log) / len(log) + if uptime >= self.APPEAR_THRESHOLD: + visible_objects.append(obj_id) + elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state.get('visible_objects', []): + # Fire disappearance event + disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} + self._send_message(disappearance_event) + + # Update new_data with visible objects + new_data['visible_objects'] = visible_objects + + # Initialize last_sent_visible_objects if not already done + if not hasattr(self, 'last_sent_visible_objects'): + self.last_sent_visible_objects = set() + + # Convert lists to sets for comparison + current_visible = set(visible_objects) + last_visible = self.last_sent_visible_objects + + # Check if there is a change in visible objects + if current_visible != last_visible: + message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" + + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message.encode('utf-8')) + logger.info(f"Sent visible objects to client: {current_visible}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending data to client: {e}") + self.clients.remove(client) + + # Update the last sent visible objects + self.last_sent_visible_objects = current_visible.copy() # Update the last_state to the new_data after sending diffs self.last_state = new_data.copy() @@ -89,6 +135,7 @@ def shutdown(self): except Exception as e: logger.error(f"Error closing client socket: {e}") self.clients.clear() + def make_serializable(obj): if isinstance(obj, set): return list(obj) @@ -111,19 +158,22 @@ def make_serializable(obj): try: # Simulate data updates current_data = { - "timestamp": datetime.utcnow().isoformat(), - "value": 100 + "timestamp": datetime.now(datetime.timezone.utc).isoformat(), + "value": 100, + "objects": [{"id": "object1"}, {"id": "object2"}] } while True: # Simulate a change in data new_value = current_data["value"] + 1 + # Example: object2 becomes object3 new_data = { - "timestamp": datetime.utcnow().isoformat(), - "value": new_value + "timestamp": datetime.now(datetime.timezone.utc).isoformat(), + "value": new_value, + "objects": [{"id": "object1"}, {"id": "object3"}] } - # Send only the diffs + # Send only the diffs with uptime processing server.send_event(new_data) # Wait for a while before next update @@ -140,4 +190,4 @@ def make_serializable(obj): try: os.unlink(SOCKET_PATH) except FileNotFoundError: - pass + pass \ No newline at end of file From 1bffff6bd64bac4c365bd8c495a533f75d89ccf4 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:20:10 +0200 Subject: [PATCH 11/27] fix(unix_domain_socket_server): add list support for new_data on send_event --- basic_pipelines/unix_domain_socket_server.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index c34ab31..895ccf2 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -61,13 +61,27 @@ def send_event(self, new_data): """ Sends only the differences (diffs) between the new_data and the last sent state. Implements object uptime for visibility detection. + + Args: + new_data: Dict or List of objects with 'id' field """ - # Compute the difference between new_data and last_state + # Convert list to dict if needed + if isinstance(new_data, list): + new_data = {"objects": new_data} + elif not isinstance(new_data, dict): + logger.error(f"Invalid data type for new_data: {type(new_data)}") + return + + # Ensure objects key exists + if 'objects' not in new_data: + new_data['objects'] = [] + + # Rest of the method remains the same diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() - + if not diff: logger.info("No changes detected. No event sent.") - return # No changes to send + return # Update object logs detected_objects = {obj['id'] for obj in new_data.get('objects', [])} From 0c7cbf775cb1df8c80121b551999e88701d8fdf0 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:29:39 +0200 Subject: [PATCH 12/27] fix(unix_domain_socket_server): accept new_data as a list of strings --- basic_pipelines/unix_domain_socket_server.py | 159 +++++++++---------- 1 file changed, 71 insertions(+), 88 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 895ccf2..c75c3f0 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -57,98 +57,81 @@ def run(self): self.server.close() logger.info("Unix Domain Socket Server shut down.") - def send_event(self, new_data): - """ - Sends only the differences (diffs) between the new_data and the last sent state. - Implements object uptime for visibility detection. +def send_event(self, new_data): + """ + Sends only the differences (diffs) between the new_data and the last sent state. + Implements object uptime for visibility detection. + """ + # Compute the difference between new_data and last_state + diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() + + if not diff: + logger.info("No changes detected. No event sent.") + return # No changes to send + + # Update object logs + detected_objects = set(new_data) + for obj_id in detected_objects: + if obj_id not in self.object_logs: + self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) + self.object_logs[obj_id].append(1) # Detected + + # Update logs for objects not detected in this event + for obj_id, log in self.object_logs.items(): + if obj_id not in detected_objects: + log.append(0) # Not detected + + # Determine currently viewable objects based on uptime + visible_objects = [] + for obj_id, log in self.object_logs.items(): + uptime = sum(log) / len(log) + if uptime >= self.APPEAR_THRESHOLD: + visible_objects.append(obj_id) + elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state: + # Fire disappearance event + disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} + self._send_message(disappearance_event) + + # Initialize last_sent_visible_objects if not already done + if not hasattr(self, 'last_sent_visible_objects'): + self.last_sent_visible_objects = set() + + # Convert lists to sets for comparison + current_visible = set(visible_objects) + last_visible = self.last_sent_visible_objects + + # Check if there is a change in visible objects + if current_visible != last_visible: + message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" - Args: - new_data: Dict or List of objects with 'id' field - """ - # Convert list to dict if needed - if isinstance(new_data, list): - new_data = {"objects": new_data} - elif not isinstance(new_data, dict): - logger.error(f"Invalid data type for new_data: {type(new_data)}") - return - - # Ensure objects key exists - if 'objects' not in new_data: - new_data['objects'] = [] - - # Rest of the method remains the same - diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() - - if not diff: - logger.info("No changes detected. No event sent.") - return - - # Update object logs - detected_objects = {obj['id'] for obj in new_data.get('objects', [])} - for obj_id in detected_objects: - if obj_id not in self.object_logs: - self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) - self.object_logs[obj_id].append(1) # Detected - - # Update logs for objects not detected in this event - for obj_id, log in self.object_logs.items(): - if obj_id not in detected_objects: - log.append(0) # Not detected - - # Determine currently viewable objects based on uptime - visible_objects = [] - for obj_id, log in self.object_logs.items(): - uptime = sum(log) / len(log) - if uptime >= self.APPEAR_THRESHOLD: - visible_objects.append(obj_id) - elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state.get('visible_objects', []): - # Fire disappearance event - disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} - self._send_message(disappearance_event) - - # Update new_data with visible objects - new_data['visible_objects'] = visible_objects - - # Initialize last_sent_visible_objects if not already done - if not hasattr(self, 'last_sent_visible_objects'): - self.last_sent_visible_objects = set() - - # Convert lists to sets for comparison - current_visible = set(visible_objects) - last_visible = self.last_sent_visible_objects - - # Check if there is a change in visible objects - if current_visible != last_visible: - message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" - - with self.lock: - for client in self.clients[:]: - try: - client.sendall(message.encode('utf-8')) - logger.info(f"Sent visible objects to client: {current_visible}") - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client) - except Exception as e: - logger.error(f"Error sending data to client: {e}") - self.clients.remove(client) - - # Update the last sent visible objects - self.last_sent_visible_objects = current_visible.copy() - - # Update the last_state to the new_data after sending diffs - self.last_state = new_data.copy() - - def shutdown(self): - logger.info("Shutting down Unix Domain Socket Server") - self.running = False with self.lock: - for client in self.clients: + for client in self.clients[:]: try: - client.close() + client.sendall(message.encode('utf-8')) + logger.info(f"Sent visible objects to client: {current_visible}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) except Exception as e: - logger.error(f"Error closing client socket: {e}") - self.clients.clear() + logger.error(f"Error sending data to client: {e}") + self.clients.remove(client) + + # Update the last sent visible objects + self.last_sent_visible_objects = current_visible.copy() + + # Update the last_state to the new_data after sending diffs + self.last_state = new_data.copy() + +def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except Exception as e: + logger.error(f"Error closing client socket: {e}") + self.clients.clear() def make_serializable(obj): if isinstance(obj, set): From e453a6225b1aed5688ef3869732b930536797584 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:30:57 +0200 Subject: [PATCH 13/27] fix(unix_domain_socket_server): missing tab --- basic_pipelines/unix_domain_socket_server.py | 128 +++++++++---------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index c75c3f0..5d68dde 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -57,70 +57,70 @@ def run(self): self.server.close() logger.info("Unix Domain Socket Server shut down.") -def send_event(self, new_data): - """ - Sends only the differences (diffs) between the new_data and the last sent state. - Implements object uptime for visibility detection. - """ - # Compute the difference between new_data and last_state - diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() - - if not diff: - logger.info("No changes detected. No event sent.") - return # No changes to send - - # Update object logs - detected_objects = set(new_data) - for obj_id in detected_objects: - if obj_id not in self.object_logs: - self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) - self.object_logs[obj_id].append(1) # Detected - - # Update logs for objects not detected in this event - for obj_id, log in self.object_logs.items(): - if obj_id not in detected_objects: - log.append(0) # Not detected - - # Determine currently viewable objects based on uptime - visible_objects = [] - for obj_id, log in self.object_logs.items(): - uptime = sum(log) / len(log) - if uptime >= self.APPEAR_THRESHOLD: - visible_objects.append(obj_id) - elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state: - # Fire disappearance event - disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} - self._send_message(disappearance_event) - - # Initialize last_sent_visible_objects if not already done - if not hasattr(self, 'last_sent_visible_objects'): - self.last_sent_visible_objects = set() - - # Convert lists to sets for comparison - current_visible = set(visible_objects) - last_visible = self.last_sent_visible_objects - - # Check if there is a change in visible objects - if current_visible != last_visible: - message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" - - with self.lock: - for client in self.clients[:]: - try: - client.sendall(message.encode('utf-8')) - logger.info(f"Sent visible objects to client: {current_visible}") - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client) - except Exception as e: - logger.error(f"Error sending data to client: {e}") - self.clients.remove(client) - - # Update the last sent visible objects - self.last_sent_visible_objects = current_visible.copy() - - # Update the last_state to the new_data after sending diffs - self.last_state = new_data.copy() + def send_event(self, new_data): + """ + Sends only the differences (diffs) between the new_data and the last sent state. + Implements object uptime for visibility detection. + """ + # Compute the difference between new_data and last_state + diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() + + if not diff: + logger.info("No changes detected. No event sent.") + return # No changes to send + + # Update object logs + detected_objects = set(new_data) + for obj_id in detected_objects: + if obj_id not in self.object_logs: + self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) + self.object_logs[obj_id].append(1) # Detected + + # Update logs for objects not detected in this event + for obj_id, log in self.object_logs.items(): + if obj_id not in detected_objects: + log.append(0) # Not detected + + # Determine currently viewable objects based on uptime + visible_objects = [] + for obj_id, log in self.object_logs.items(): + uptime = sum(log) / len(log) + if uptime >= self.APPEAR_THRESHOLD: + visible_objects.append(obj_id) + elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state: + # Fire disappearance event + disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} + self._send_message(disappearance_event) + + # Initialize last_sent_visible_objects if not already done + if not hasattr(self, 'last_sent_visible_objects'): + self.last_sent_visible_objects = set() + + # Convert lists to sets for comparison + current_visible = set(visible_objects) + last_visible = self.last_sent_visible_objects + + # Check if there is a change in visible objects + if current_visible != last_visible: + message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" + + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message.encode('utf-8')) + logger.info(f"Sent visible objects to client: {current_visible}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending data to client: {e}") + self.clients.remove(client) + + # Update the last sent visible objects + self.last_sent_visible_objects = current_visible.copy() + + # Update the last_state to the new_data after sending diffs + self.last_state = new_data.copy() def shutdown(self): logger.info("Shutting down Unix Domain Socket Server") From 97bf572ab29ea03a9ed3a45bd6153d95bb9831f1 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:40:37 +0200 Subject: [PATCH 14/27] fix(unix_domain_socket_server): add missing _send_message method --- basic_pipelines/unix_domain_socket_server.py | 46 +++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 5d68dde..9151c83 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -122,22 +122,36 @@ def send_event(self, new_data): # Update the last_state to the new_data after sending diffs self.last_state = new_data.copy() -def shutdown(self): - logger.info("Shutting down Unix Domain Socket Server") - self.running = False - with self.lock: - for client in self.clients: - try: - client.close() - except Exception as e: - logger.error(f"Error closing client socket: {e}") - self.clients.clear() - -def make_serializable(obj): - if isinstance(obj, set): - return list(obj) - # Add other custom serialization logic as needed - return str(obj) # Fallback to string representation + def _send_message(self, message): + message_str = json.dumps(message) + "\n" + with self.lock: + for client in self.clients[:]: + try: + client.sendall(message_str.encode('utf-8')) + logger.info(f"Sent event to client: {message}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client) + except Exception as e: + logger.error(f"Error sending event to client: {e}") + self.clients.remove(client) + + def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except Exception as e: + logger.error(f"Error closing client socket: {e}") + self.clients.clear() + + def make_serializable(obj): + if isinstance(obj, set): + return list(obj) + # Add other custom serialization logic as needed + return str(obj) # Fallback to string representation # ----------------------------------------------------------------------------------------------- # Example Usage From b70ac9025bb20eaad342dd8731dee31c2b711090 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:42:39 +0200 Subject: [PATCH 15/27] fix(unix_domain_socket_server): revert shutdown and make_serializable tab --- basic_pipelines/unix_domain_socket_server.py | 32 ++++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 9151c83..36b74f6 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -136,22 +136,22 @@ def _send_message(self, message): logger.error(f"Error sending event to client: {e}") self.clients.remove(client) - def shutdown(self): - logger.info("Shutting down Unix Domain Socket Server") - self.running = False - with self.lock: - for client in self.clients: - try: - client.close() - except Exception as e: - logger.error(f"Error closing client socket: {e}") - self.clients.clear() - - def make_serializable(obj): - if isinstance(obj, set): - return list(obj) - # Add other custom serialization logic as needed - return str(obj) # Fallback to string representation +def shutdown(self): + logger.info("Shutting down Unix Domain Socket Server") + self.running = False + with self.lock: + for client in self.clients: + try: + client.close() + except Exception as e: + logger.error(f"Error closing client socket: {e}") + self.clients.clear() + +def make_serializable(obj): + if isinstance(obj, set): + return list(obj) + # Add other custom serialization logic as needed + return str(obj) # Fallback to string representation # ----------------------------------------------------------------------------------------------- # Example Usage From 90ceddc9053c14cd4bfc5ce3f962bde8ace31434 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 01:57:01 +0200 Subject: [PATCH 16/27] fix(unix_domain_socket_server): event are not sent on time --- basic_pipelines/unix_domain_socket_server.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 36b74f6..1691f69 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -70,16 +70,16 @@ def send_event(self, new_data): return # No changes to send # Update object logs - detected_objects = set(new_data) + detected_objects = set(obj['id'] for obj in new_data.get('objects', [])) for obj_id in detected_objects: if obj_id not in self.object_logs: self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) self.object_logs[obj_id].append(1) # Detected # Update logs for objects not detected in this event - for obj_id, log in self.object_logs.items(): + for obj_id in list(self.object_logs.keys()): if obj_id not in detected_objects: - log.append(0) # Not detected + self.object_logs[obj_id].append(0) # Not detected # Determine currently viewable objects based on uptime visible_objects = [] @@ -87,10 +87,11 @@ def send_event(self, new_data): uptime = sum(log) / len(log) if uptime >= self.APPEAR_THRESHOLD: visible_objects.append(obj_id) - elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state: + elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state.get('objects', [{}]): # Fire disappearance event disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} self._send_message(disappearance_event) + del self.object_logs[obj_id] # Remove object from logs # Initialize last_sent_visible_objects if not already done if not hasattr(self, 'last_sent_visible_objects'): @@ -121,7 +122,7 @@ def send_event(self, new_data): # Update the last_state to the new_data after sending diffs self.last_state = new_data.copy() - + def _send_message(self, message): message_str = json.dumps(message) + "\n" with self.lock: From 149c3e55948181895fabf317209e592d45d01b9a Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sat, 30 Nov 2024 02:00:13 +0200 Subject: [PATCH 17/27] fix(unix_domain_socket_cervice): fixed input as list --- basic_pipelines/unix_domain_socket_server.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 1691f69..ca1a92d 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -62,15 +62,18 @@ def send_event(self, new_data): Sends only the differences (diffs) between the new_data and the last sent state. Implements object uptime for visibility detection. """ + # Convert new_data list to dictionary format + new_data_dict = {'objects': [{'id': obj_id} for obj_id in new_data]} + # Compute the difference between new_data and last_state - diff = DeepDiff(self.last_state, new_data, ignore_order=True).to_dict() + diff = DeepDiff(self.last_state, new_data_dict, ignore_order=True).to_dict() if not diff: logger.info("No changes detected. No event sent.") return # No changes to send # Update object logs - detected_objects = set(obj['id'] for obj in new_data.get('objects', [])) + detected_objects = set(obj['id'] for obj in new_data_dict.get('objects', [])) for obj_id in detected_objects: if obj_id not in self.object_logs: self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) @@ -121,7 +124,7 @@ def send_event(self, new_data): self.last_sent_visible_objects = current_visible.copy() # Update the last_state to the new_data after sending diffs - self.last_state = new_data.copy() + self.last_state = new_data_dict.copy() def _send_message(self, message): message_str = json.dumps(message) + "\n" From b154feee646ed4ca0979520e74ba4ca2415b6d2d Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 01:54:07 +0200 Subject: [PATCH 18/27] fix(unix_domain_socket_server): refactor send_event method and improve object visibility handling --- basic_pipelines/unix_domain_socket_server.py | 165 ++++++++++++------- 1 file changed, 104 insertions(+), 61 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index ca1a92d..83f5faa 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -57,74 +57,117 @@ def run(self): self.server.close() logger.info("Unix Domain Socket Server shut down.") - def send_event(self, new_data): + def send_event(self, event_payload): """ Sends only the differences (diffs) between the new_data and the last sent state. Implements object uptime for visibility detection. """ - # Convert new_data list to dictionary format - new_data_dict = {'objects': [{'id': obj_id} for obj_id in new_data]} - - # Compute the difference between new_data and last_state - diff = DeepDiff(self.last_state, new_data_dict, ignore_order=True).to_dict() - - if not diff: + new_state = self._event_payload_to_state(event_payload) + differences = self.compute_differences(new_state) + + if not differences: logger.info("No changes detected. No event sent.") - return # No changes to send - - # Update object logs - detected_objects = set(obj['id'] for obj in new_data_dict.get('objects', [])) - for obj_id in detected_objects: - if obj_id not in self.object_logs: - self.object_logs[obj_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) - self.object_logs[obj_id].append(1) # Detected - - # Update logs for objects not detected in this event - for obj_id in list(self.object_logs.keys()): - if obj_id not in detected_objects: - self.object_logs[obj_id].append(0) # Not detected - - # Determine currently viewable objects based on uptime - visible_objects = [] - for obj_id, log in self.object_logs.items(): - uptime = sum(log) / len(log) - if uptime >= self.APPEAR_THRESHOLD: - visible_objects.append(obj_id) - elif uptime < self.DISAPPEAR_THRESHOLD and obj_id in self.last_state.get('objects', [{}]): - # Fire disappearance event - disappearance_event = {'event': 'object_disappeared', 'object_id': obj_id} - self._send_message(disappearance_event) - del self.object_logs[obj_id] # Remove object from logs - - # Initialize last_sent_visible_objects if not already done + return + + self.update_object_logs(new_state) + currently_visible_objects = self.determine_visible_objects() + + self.send_visible_objects(currently_visible_objects) + + self.update_last_state(new_state) + + def _event_payload_to_state(self, event_payload): + """ + Converts new_data list to dictionary format with full word keys. + """ + return {'objects': [{'id': object_id} for object_id in event_payload]} + + def compute_differences(self, new_state): + """ + Computes the difference between the new_data and last_state. + """ + return DeepDiff(self.last_state, new_state, ignore_order=True) != {} + + def update_object_logs(self, new_state): + """ + Updates object logs based on detected object IDs. + """ + objects = new_state.get('objects', []) + detected_object_ids = set(object['id'] for object in objects) + for object_id in detected_object_ids: + if object_id not in self.object_logs: + self.object_logs[object_id] = deque(maxlen=self.UPTIME_WINDOW_SIZE) + self.object_logs[object_id].append(1) # Object detected + + for object_id in list(self.object_logs.keys()): + if object_id not in detected_object_ids: + self.object_logs[object_id].append(0) # Object not detected + + # Remove objects that are not detected for a long time + for object_id, log in self.object_logs.items(): + if len(log) == self.UPTIME_WINDOW_SIZE and sum(log) == 0: + del self.object_logs[object_id] + + return detected_object_ids + + def determine_visible_objects(self): + """ + Determines currently viewable objects based on uptime. + An object becomes visible once uptime_ratio >= APPEAR_THRESHOLD and remains visible until uptime_ratio < DISAPPEAR_THRESHOLD. + """ + for object_id, log in self.object_logs.items(): + uptime_ratio = sum(log) / len(log) + if object_id in self.last_sent_visible_objects: + if uptime_ratio < self.DISAPPEAR_THRESHOLD: + disappearance_event = {'event': 'object_disappeared', 'object_id': object_id} + self.send_message_to_client(disappearance_event) + self.last_sent_visible_objects.remove(object_id) + else: + if uptime_ratio >= self.APPEAR_THRESHOLD: + appearance_event = {'event': 'object_appeared', 'object_id': object_id} + self.send_message_to_client(appearance_event) + self.last_sent_visible_objects.add(object_id) + return list(self.last_sent_visible_objects) + + def _object_existed(self, object_id): + return object_id in self.last_state.get('objects', [{}])[0] + + def has_visible_objects_changed(self, currently_visible_objects): + """ + Checks if there is a change in visible objects. + """ if not hasattr(self, 'last_sent_visible_objects'): self.last_sent_visible_objects = set() - - # Convert lists to sets for comparison - current_visible = set(visible_objects) - last_visible = self.last_sent_visible_objects - - # Check if there is a change in visible objects - if current_visible != last_visible: - message = json.dumps({'visible_objects': list(current_visible)}, default=make_serializable) + "\n" - - with self.lock: - for client in self.clients[:]: - try: - client.sendall(message.encode('utf-8')) - logger.info(f"Sent visible objects to client: {current_visible}") - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client) - except Exception as e: - logger.error(f"Error sending data to client: {e}") - self.clients.remove(client) - - # Update the last sent visible objects - self.last_sent_visible_objects = current_visible.copy() - - # Update the last_state to the new_data after sending diffs - self.last_state = new_data_dict.copy() + current_visible_set = set(currently_visible_objects) + last_visible_set = self.last_sent_visible_objects + if current_visible_set != last_visible_set: + self.current_visible_set = current_visible_set + return True + return False + + def send_visible_objects(self, currently_visible_objects): + """ + Sends the list of currently visible objects to clients. + """ + message = json.dumps({'visible_objects': list(self.current_visible_set)}, default=make_serializable) + "\n" + with self.lock: + for client_connection in self.clients[:]: + try: + client_connection.sendall(message.encode('utf-8')) + logger.info(f"Sent visible objects to client: {self.current_visible_set}") + except BrokenPipeError: + logger.warning("Client disconnected.") + self.clients.remove(client_connection) + except Exception as exception_error: + logger.error(f"Error sending data to client: {exception_error}") + self.clients.remove(client_connection) + self.last_sent_visible_objects = self.current_visible_set.copy() + + def update_last_state(self, new_data_dictionary): + """ + Updates the last_state to the new_data after sending diffs. + """ + self.last_state = new_data_dictionary.copy() def _send_message(self, message): message_str = json.dumps(message) + "\n" From dc1642f23cfae579fc9cb69cd40aa7d621d031a2 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 01:57:53 +0200 Subject: [PATCH 19/27] fix(unix_domain_socket_server): init last sent visible objects --- basic_pipelines/unix_domain_socket_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 83f5faa..e6cd281 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -27,6 +27,8 @@ def __init__(self, socket_path): self.running = True self.last_state = {} self.object_logs = {} # To track detections per object + self.last_sent_visible_objects = set() # To track visible objects + # Ensure the socket does not already exist try: From 6e02f3355bd144f0eafcc0e60491db8e4100d4e2 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 01:58:53 +0200 Subject: [PATCH 20/27] fix(unix_domain_socket_server): update call to _send_message --- basic_pipelines/unix_domain_socket_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index e6cd281..568d30d 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -122,12 +122,12 @@ def determine_visible_objects(self): if object_id in self.last_sent_visible_objects: if uptime_ratio < self.DISAPPEAR_THRESHOLD: disappearance_event = {'event': 'object_disappeared', 'object_id': object_id} - self.send_message_to_client(disappearance_event) + self._send_message(disappearance_event) self.last_sent_visible_objects.remove(object_id) else: if uptime_ratio >= self.APPEAR_THRESHOLD: appearance_event = {'event': 'object_appeared', 'object_id': object_id} - self.send_message_to_client(appearance_event) + self._send_message(appearance_event) self.last_sent_visible_objects.add(object_id) return list(self.last_sent_visible_objects) From 04f695c2f4084fa6ac35fb70b3206dc698ff1b07 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 02:00:16 +0200 Subject: [PATCH 21/27] fix(unix_domain_socket_server): init current_visible_set --- basic_pipelines/unix_domain_socket_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 568d30d..ed8aa56 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -28,7 +28,7 @@ def __init__(self, socket_path): self.last_state = {} self.object_logs = {} # To track detections per object self.last_sent_visible_objects = set() # To track visible objects - + self.current_visible_set = set() # To track visible objects # Ensure the socket does not already exist try: From 8355c2c379356e59825749c814a2006b4c05a398 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 02:08:39 +0200 Subject: [PATCH 22/27] fix(unix_domain_socket_server): improve visibility detection and message sending logic --- basic_pipelines/unix_domain_socket_server.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index ed8aa56..5fad095 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -65,15 +65,20 @@ def send_event(self, event_payload): Implements object uptime for visibility detection. """ new_state = self._event_payload_to_state(event_payload) - differences = self.compute_differences(new_state) + #differences = self.compute_differences(new_state) - if not differences: - logger.info("No changes detected. No event sent.") - return + #if not differences: + # logger.info("No changes detected. No event sent.") + # return self.update_object_logs(new_state) + previously_visible_objects = self.last_sent_visible_objects.copy() currently_visible_objects = self.determine_visible_objects() - + visible_objects_changed = DeepDiff(previously_visible_objects, currently_visible_objects) != {} + if not visible_objects_changed: + logger.info("No changes in visible objects. No event sent.") + return + self.send_visible_objects(currently_visible_objects) self.update_last_state(new_state) @@ -155,7 +160,8 @@ def send_visible_objects(self, currently_visible_objects): with self.lock: for client_connection in self.clients[:]: try: - client_connection.sendall(message.encode('utf-8')) + self._send_message(message) + #client_connection.sendall(message.encode('utf-8')) logger.info(f"Sent visible objects to client: {self.current_visible_set}") except BrokenPipeError: logger.warning("Client disconnected.") From 4897f497065c9eb8d27daa477523acae72de11f3 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 02:19:10 +0200 Subject: [PATCH 23/27] fix(unix_domain_socket_server): simplify send_visible_objects method and update visibility handling --- basic_pipelines/unix_domain_socket_server.py | 32 ++++++++------------ 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 5fad095..294f097 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -79,7 +79,7 @@ def send_event(self, event_payload): logger.info("No changes in visible objects. No event sent.") return - self.send_visible_objects(currently_visible_objects) + self.send_visible_objects() self.update_last_state(new_state) @@ -122,19 +122,23 @@ def determine_visible_objects(self): Determines currently viewable objects based on uptime. An object becomes visible once uptime_ratio >= APPEAR_THRESHOLD and remains visible until uptime_ratio < DISAPPEAR_THRESHOLD. """ + current_visible_objects = self.last_sent_visible_objects.copy() for object_id, log in self.object_logs.items(): uptime_ratio = sum(log) / len(log) if object_id in self.last_sent_visible_objects: if uptime_ratio < self.DISAPPEAR_THRESHOLD: disappearance_event = {'event': 'object_disappeared', 'object_id': object_id} self._send_message(disappearance_event) - self.last_sent_visible_objects.remove(object_id) + current_visible_objects.remove(object_id) else: if uptime_ratio >= self.APPEAR_THRESHOLD: appearance_event = {'event': 'object_appeared', 'object_id': object_id} self._send_message(appearance_event) - self.last_sent_visible_objects.add(object_id) - return list(self.last_sent_visible_objects) + current_visible_objects.add(object_id) + + self.current_visible_set = current_visible_objects.copy() + + return list(current_visible_objects) def _object_existed(self, object_id): return object_id in self.last_state.get('objects', [{}])[0] @@ -152,30 +156,20 @@ def has_visible_objects_changed(self, currently_visible_objects): return True return False - def send_visible_objects(self, currently_visible_objects): + def send_visible_objects(self): """ Sends the list of currently visible objects to clients. """ - message = json.dumps({'visible_objects': list(self.current_visible_set)}, default=make_serializable) + "\n" - with self.lock: - for client_connection in self.clients[:]: - try: - self._send_message(message) - #client_connection.sendall(message.encode('utf-8')) - logger.info(f"Sent visible objects to client: {self.current_visible_set}") - except BrokenPipeError: - logger.warning("Client disconnected.") - self.clients.remove(client_connection) - except Exception as exception_error: - logger.error(f"Error sending data to client: {exception_error}") - self.clients.remove(client_connection) - self.last_sent_visible_objects = self.current_visible_set.copy() + message = json.dumps({'visible_objects': list(self.current_visible_set)}, default=make_serializable) + self._send_message(message) def update_last_state(self, new_data_dictionary): """ Updates the last_state to the new_data after sending diffs. """ self.last_state = new_data_dictionary.copy() + self.last_sent_visible_objects = self.current_visible_set.copy() + def _send_message(self, message): message_str = json.dumps(message) + "\n" From dfd14a6ed1daa60b13137257eaec5a6ca364bd01 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 02:24:02 +0200 Subject: [PATCH 24/27] fix(unix_domain_socket_server): optimize visibility change detection logic --- basic_pipelines/unix_domain_socket_server.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 294f097..5999a4b 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -72,9 +72,8 @@ def send_event(self, event_payload): # return self.update_object_logs(new_state) - previously_visible_objects = self.last_sent_visible_objects.copy() - currently_visible_objects = self.determine_visible_objects() - visible_objects_changed = DeepDiff(previously_visible_objects, currently_visible_objects) != {} + self.determine_visible_objects() + visible_objects_changed = DeepDiff(self.last_sent_visible_objects, self.current_visible_set) != {} if not visible_objects_changed: logger.info("No changes in visible objects. No event sent.") return From db0389d4dd49ef3c570f84f9afb38d176705bf06 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Sun, 1 Dec 2024 02:25:41 +0200 Subject: [PATCH 25/27] fix(unix_domain_socket_server): reduce uptime window size for event tracking --- basic_pipelines/unix_domain_socket_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/basic_pipelines/unix_domain_socket_server.py b/basic_pipelines/unix_domain_socket_server.py index 5999a4b..c06f4ca 100644 --- a/basic_pipelines/unix_domain_socket_server.py +++ b/basic_pipelines/unix_domain_socket_server.py @@ -15,7 +15,7 @@ # Unix Domain Socket Server # ----------------------------------------------------------------------------------------------- class UnixDomainSocketServer(threading.Thread): - UPTIME_WINDOW_SIZE = 100 # Number of events to track per object + UPTIME_WINDOW_SIZE = 30 # Number of events to track per object, 10 per second APPEAR_THRESHOLD = 0.6 DISAPPEAR_THRESHOLD = 0.3 From 8705cf2b77efb06f43cdfa17ad91e7fa325637a1 Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Wed, 25 Dec 2024 00:37:21 +0200 Subject: [PATCH 26/27] fix(community): move community project to appropriate folder --- .../UnixDomainSocketIntegration/README.md | 46 +++++++++++++++++++ .../detection_service.py | 4 ++ .../requirements.txt | 5 ++ .../test_tools}/event_listener.py | 0 .../unix_domain_socket_server.py | 0 requirements.txt | 4 +- 6 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 community_projects/UnixDomainSocketIntegration/README.md rename {basic_pipelines => community_projects/UnixDomainSocketIntegration}/detection_service.py (98%) create mode 100644 community_projects/UnixDomainSocketIntegration/requirements.txt rename {test_tools => community_projects/UnixDomainSocketIntegration/test_tools}/event_listener.py (100%) mode change 100755 => 100644 rename {basic_pipelines => community_projects/UnixDomainSocketIntegration}/unix_domain_socket_server.py (100%) diff --git a/community_projects/UnixDomainSocketIntegration/README.md b/community_projects/UnixDomainSocketIntegration/README.md new file mode 100644 index 0000000..707bede --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/README.md @@ -0,0 +1,46 @@ +# Hailo UnixDomainSocketIntegration example + +This exmaple is based on the detection pipeline. It is an example for converting a pipeline to event-based integration. +The demo model is Object detection, but this can be applied to any model. + +The server measures 'uptime' of each object, and takes only objects with enough 'uptime' (To ignore flicering due bad detection) + +## Installation +Follow the installation flow in the main README file, and then continue following this README file. + +### Enable SPI +```bash +sudo raspi-config +``` + +- 3 Interface Options +- I4 SPI +- Yes +- reboot + + +### Navigate to the repository directory: +```bash +cd hailo-rpi5-examples +``` + +### Environment Configuration (Required for Each New Terminal Session) +Ensure your environment is set up correctly by sourcing the provided script. This script sets the required environment variables and activates the Hailo virtual environment. If the virtual environment does not exist, it will be created automatically. +```bash +source setup_env.sh +``` +### Navigate to the example directory: +```bash +cd community_projects/UnixDomainSocketIntegration/ +``` +### Requirements Installation +Within the activated virtual environment, install the necessary Python packages: +```bash +pip install -r requirements.txt +``` + +### To Run the Simple Example: +```bash +python detection_service.py +``` +- To close the application, press `Ctrl+C`. diff --git a/basic_pipelines/detection_service.py b/community_projects/UnixDomainSocketIntegration/detection_service.py similarity index 98% rename from basic_pipelines/detection_service.py rename to community_projects/UnixDomainSocketIntegration/detection_service.py index 6e4603d..2dd275e 100644 --- a/basic_pipelines/detection_service.py +++ b/community_projects/UnixDomainSocketIntegration/detection_service.py @@ -5,6 +5,10 @@ import numpy as np import cv2 import hailo +import sys + +sys.path.append('../../basic_pipelines') + from hailo_rpi_common import ( get_caps_from_pad, get_numpy_from_buffer, diff --git a/community_projects/UnixDomainSocketIntegration/requirements.txt b/community_projects/UnixDomainSocketIntegration/requirements.txt new file mode 100644 index 0000000..db48d9c --- /dev/null +++ b/community_projects/UnixDomainSocketIntegration/requirements.txt @@ -0,0 +1,5 @@ +numpy<2.0.0 +setproctitle +opencv-python +python-dotenv +DeepDiff \ No newline at end of file diff --git a/test_tools/event_listener.py b/community_projects/UnixDomainSocketIntegration/test_tools/event_listener.py old mode 100755 new mode 100644 similarity index 100% rename from test_tools/event_listener.py rename to community_projects/UnixDomainSocketIntegration/test_tools/event_listener.py diff --git a/basic_pipelines/unix_domain_socket_server.py b/community_projects/UnixDomainSocketIntegration/unix_domain_socket_server.py similarity index 100% rename from basic_pipelines/unix_domain_socket_server.py rename to community_projects/UnixDomainSocketIntegration/unix_domain_socket_server.py diff --git a/requirements.txt b/requirements.txt index d213671..e5fab20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ numpy<2.0.0 setproctitle -opencv-python -python-dotenv -DeepDiff +opencv-python \ No newline at end of file From eb4e8daaf71ed1f36a1100b96e9385137310fd9c Mon Sep 17 00:00:00 2001 From: Ori Nachum Date: Wed, 25 Dec 2024 00:39:05 +0200 Subject: [PATCH 27/27] fix(docs): Remove community project documentation --- doc/basic-pipelines.md | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/doc/basic-pipelines.md b/doc/basic-pipelines.md index 3ad249b..2f69b1d 100644 --- a/doc/basic-pipelines.md +++ b/doc/basic-pipelines.md @@ -384,16 +384,6 @@ This creates a file named `pipeline.dot` in the `basic_pipelines` directory. ![detection_pipeline](images/detection_pipeline.png) *Tip: Right-click the image and select "Open image in new tab" to view the full image.* -##### [Detection Example](doc/basic-pipelines.md#detection-example) -![Detection Example](doc/images/detection.gif) - -# Detection-service Example -![Banner](images/detection_Servier.gif) - -This mode allows integration with minimal changes to your app. -You can test out the integration, as well as see a demo how to integrate it in your app by looking at **test_tools**. -The test tool allows for integrating with either holding for next input, or continuing. - # Troubleshooting and Known Issues If you encounter any issues, please open a ticket in the [Hailo Community Forum](https://community.hailo.ai/). The forum is a valuable resource filled with useful information and potential solutions.