Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance GStreamer Detection App with GUI and Improved Reliability #41

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 149 additions & 69 deletions basic_pipelines/detection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import gi
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gtk, Gdk, GdkX11, GstVideo
from gi.repository import Gst, GLib
import os
import argparse
Expand All @@ -9,6 +11,8 @@
import cv2
import time
import hailo
import threading
import logging
from hailo_rpi_common import (
get_default_parser,
QUEUE,
Expand All @@ -18,131 +22,107 @@
app_callback_class,
)

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
# Inheritance from the app_callback_class
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # New variable example

def new_function(self): # New function example
return "The meaning of life is: "

# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------

# This is the callback function that will be called when data is available from the pipeline
def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
# Check if the buffer is valid
if buffer is None:
return Gst.PadProbeReturn.OK

# Using the user_data to count the number of frames

user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n"

# Get the caps from the pad

format, width, height = get_caps_from_pad(pad)

# If the user_data.use_frame is set to True, we can get the video frame from the buffer
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
# Get video frame
frame = get_numpy_from_buffer(buffer, format, width, height)

# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

# Parse the detections

detection_count = 0
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if label == "person":
string_to_print += f"Detection: {label} {confidence:.2f}\n"
detection_count += 1
logger.info(string_to_print)

if user_data.use_frame:
# Note: using imshow will not work here, as the callback function is not running in the main thread
# Let's print the detection count to the frame
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Example of how to use the new_variable and new_function from the user_data
# Let's print the new_variable and the result of the new_function to the frame
cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Convert the frame to BGR
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame)

print(string_to_print)
#logger.info(string_to_print)
return Gst.PadProbeReturn.OK


# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------

# This class inherits from the hailo_rpi_common.GStreamerApp class
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, args, user_data):
# Call the parent class constructor
super().__init__(args, user_data)
# Additional initialization code can be added here
# Set Hailo parameters these parameters should be set based on the model used
self.window = None
self.batch_size = 2
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
nms_score_threshold = 0.3
nms_iou_threshold = 0.45

# Temporary code: new postprocess will be merged to TAPPAS.
# Check if new postprocess so file exists
self.nms_score_threshold = 0.3
self.nms_iou_threshold = 0.45
self.max_retries = 5
self.retry_delay = 5 # seconds
self.watchdog_interval = 30 # seconds
self.last_frame_time = time.time()
self.frame_count = 0
self.last_frame_count = 0
self.restart_in_progress = False

new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
if os.path.exists(new_postprocess_path):
self.default_postprocess_so = new_postprocess_path
else:
self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

if args.hef_path is not None:
self.hef_path = args.hef_path
# Set the HEF file path based on the network
elif args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
elif args.network == "yolov8s":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
elif args.network == "yolox_s_leaky":
self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
else:
assert False, "Invalid network type"

# User-defined label JSON file
if args.labels_json is not None:
self.labels_config = f' config-path={args.labels_json} '
# Temporary code
if not os.path.exists(new_postprocess_path):
print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
exit(1)
else:
self.labels_config = ''
self.default_postprocess_so = new_postprocess_path if os.path.exists(new_postprocess_path) else os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

self.hef_path = args.hef_path if args.hef_path else self.get_hef_path(args.network)

self.labels_config = f' config-path={args.labels_json} ' if args.labels_json else ''

self.app_callback = app_callback

self.thresholds_str = (
f"nms-score-threshold={nms_score_threshold} "
f"nms-iou-threshold={nms_iou_threshold} "
f"nms-score-threshold={self.nms_score_threshold} "
f"nms-iou-threshold={self.nms_iou_threshold} "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)

# Set the process title
setproctitle.setproctitle("Hailo Detection App")

self.create_pipeline()

def get_hef_path(self, network):
if network == "yolov6n":
return os.path.join(self.current_path, '../resources/yolov6n.hef')
elif network == "yolov8s":
return os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
elif network == "yolox_s_leaky":
return os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
else:
raise ValueError("Invalid network type")

def get_pipeline_string(self):
if self.source_type == "rpi":
Expand All @@ -158,6 +138,17 @@ def get_pipeline_string(self):
f"v4l2src device={self.video_source} name=src_0 ! "
"video/x-raw, width=640, height=480, framerate=30/1 ! "
)
elif self.video_source.startswith("rtsp://"):
source_element = (
f"rtspsrc name=src_0 location={self.video_source} "
"protocols=tcp+udp latency=2000 "
"retry=5 timeout=5000000 tcp-timeout=5000000 "
"drop-on-latency=true do-retransmission=false "
"buffer-mode=auto "
"! rtph265depay ! h265parse ! avdec_h265 "
"! videorate ! videoconvert ! videoscale "
f"! video/x-raw, format=RGB, width={self.network_width}, height={self.network_height} ! "
)
else:
source_element = (
f"filesrc location={self.video_source} name=src_0 ! "
Expand Down Expand Up @@ -194,16 +185,105 @@ def get_pipeline_string(self):
+ QUEUE("queue_videoconvert")
+ "videoconvert n-threads=3 qos=false ! "
+ QUEUE("queue_hailo_display")
+ f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
+ "ximagesink name=sink"
)
print(pipeline_string)
logger.debug(f"Pipeline string: {pipeline_string}")
return pipeline_string

def start_watchdog(self):
threading.Thread(target=self.watchdog_thread, daemon=True).start()

def watchdog_thread(self):
while True:
time.sleep(self.watchdog_interval)
if self.frame_count == self.last_frame_count and not self.restart_in_progress:
logger.warning("No new frames received. Restarting pipeline...")
self.restart_pipeline()
self.last_frame_count = self.frame_count
def restart_pipeline(self):
if self.restart_in_progress:
logger.warning("Restart already in progress, skipping...")
return

self.restart_in_progress = True
try:
logger.info("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
time.sleep(2) # Give some time for cleanup

logger.info("Recreating pipeline...")
self.create_pipeline()
time.sleep(2) # Give some time for setup

logger.info("Starting pipeline...")
self.pipeline.set_state(Gst.State.PLAYING)
self.add_probe() # Re-add the probe after recreating the pipeline
except Exception as e:
logger.error(f"Error during pipeline restart: {e}")
finally:
self.restart_in_progress = False
def add_probe(self):
identity_callback = self.pipeline.get_by_name("identity_callback")
if identity_callback:
pad = identity_callback.get_static_pad("src")
if pad:
pad.add_probe(Gst.PadProbeType.BUFFER, self.probe_callback, self.user_data)
else:
logger.error("Failed to get pad from identity_callback")
else:
logger.error("Failed to get identity_callback element")
def probe_callback(self, pad, info, user_data):
self.frame_count += 1
self.last_frame_time = time.time()
return self.app_callback(pad, info, user_data)
def run(self):
self.start_watchdog()
self.create_window()
for attempt in range(self.max_retries):
try:
self.create_pipeline()
self.add_probe()

# Get the ximagesink element and set its window handle
sink = self.pipeline.get_by_name('sink')
self.video_area.realize()
xid = self.video_area.get_window().get_xid()
sink.set_window_handle(xid)

self.pipeline.set_state(Gst.State.PLAYING)
Gtk.main()
break
except GLib.Error as e:
if "Could not read from resource" in str(e) and attempt < self.max_retries - 1:
logger.error(f"RTSP connection failed. Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
logger.error(f"Unhandled error: {e}")
raise
def quit(self):
self.pipeline.set_state(Gst.State.NULL)
if hasattr(self, 'loop') and self.loop.is_running():
self.loop.quit()
def create_window(self):
self.window = Gtk.Window(title="Hailo Detection App")
self.window.connect("delete-event", self.on_window_close)
self.window.set_default_size(640, 640) # Set your desired size

box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.window.add(box)

self.video_area = Gtk.DrawingArea()
box.pack_start(self.video_area, True, True, 0)

self.window.show_all()
def on_window_close(self, *args):
self.quit()
Gtk.main_quit()
return False
if __name__ == "__main__":
# Create an instance of the user app callback class
Gtk.init(None)
user_data = user_app_callback_class()
parser = get_default_parser()
# Add additional arguments here
parser.add_argument(
"--network",
default="yolov6n",
Expand Down