Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use zmq for event end queue and update python deps #10886

Merged
merged 4 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/main/requirements-wheels.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ imutils == 0.5.*
markupsafe == 2.1.*
matplotlib == 3.7.*
mypy == 1.6.1
numpy == 1.23.*
numpy == 1.26.*
onvif_zeep == 0.2.12
opencv-python-headless == 4.7.0.*
paho-mqtt == 2.0.*
pandas == 2.1.4
pandas == 2.2.*
peewee == 3.17.*
peewee_migrate == 1.12.*
psutil == 5.9.*
Expand Down
6 changes: 0 additions & 6 deletions frigate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ def set_log_levels(self) -> None:
logging.getLogger("ws4py").setLevel("ERROR")

def init_queues(self) -> None:
# Queues for clip processing
self.event_processed_queue: Queue = mp.Queue()

# Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue(
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
Expand Down Expand Up @@ -420,7 +417,6 @@ def start_detected_frames_processor(self) -> None:
self.config,
self.dispatcher,
self.detected_frames_queue,
self.event_processed_queue,
self.ptz_autotracker_thread,
self.stop_event,
)
Expand Down Expand Up @@ -517,7 +513,6 @@ def start_timeline_processor(self) -> None:
def start_event_processor(self) -> None:
self.event_processor = EventProcessor(
self.config,
self.event_processed_queue,
self.timeline_queue,
self.stop_event,
)
Expand Down Expand Up @@ -704,7 +699,6 @@ def stop(self) -> None:
shm.unlink()

for queue in [
self.event_processed_queue,
self.detected_frames_queue,
self.log_queue,
]:
Expand Down
2 changes: 1 addition & 1 deletion frigate/comms/detections_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

SOCKET_CONTROL = "inproc://control.detections_updater"
SOCKET_PUB = "ipc:///tmp/cache/detect_pub"
SOCKET_SUB = "ipc:///tmp/cache/detect_sun"
SOCKET_SUB = "ipc:///tmp/cache/detect_sub"


class DetectionTypeEnum(str, Enum):
Expand Down
49 changes: 48 additions & 1 deletion frigate/comms/events_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from frigate.events.types import EventStateEnum, EventTypeEnum

SOCKET_PUSH_PULL = "ipc:///tmp/cache/events"
SOCKET_PUSH_PULL_END = "ipc:///tmp/cache/events_ended"


class EventUpdatePublisher:
Expand Down Expand Up @@ -37,7 +38,53 @@ def __init__(self) -> None:
def check_for_update(
self, timeout=1
) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]:
"""Returns updated config or None if no update."""
"""Returns events or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)

if has_update:
return self.socket.recv_pyobj()
except zmq.ZMQError:
pass

return None

def stop(self) -> None:
self.socket.close()
self.context.destroy()


class EventEndPublisher:
"""Publishes events that have ended."""

def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUSH)
self.socket.connect(SOCKET_PUSH_PULL_END)

def publish(
self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]
) -> None:
"""There is no communication back to the processes."""
self.socket.send_pyobj(payload)

def stop(self) -> None:
self.socket.close()
self.context.destroy()


class EventEndSubscriber:
"""Receives events that have ended."""

def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
self.socket.bind(SOCKET_PUSH_PULL_END)

def check_for_update(
self, timeout=1
) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]:
"""Returns events ended or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)

Expand Down
8 changes: 4 additions & 4 deletions frigate/events/maintainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict

from frigate.comms.events_updater import EventUpdateSubscriber
from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber
from frigate.config import EventsConfig, FrigateConfig
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event
Expand Down Expand Up @@ -52,19 +52,18 @@ class EventProcessor(threading.Thread):
def __init__(
self,
config: FrigateConfig,
event_processed_queue: Queue,
timeline_queue: Queue,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "event_processor"
self.config = config
self.event_processed_queue = event_processed_queue
self.timeline_queue = timeline_queue
self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event

self.event_receiver = EventUpdateSubscriber()
self.event_end_publisher = EventEndPublisher()

def run(self) -> None:
# set an end_time on events without an end_time on startup
Expand Down Expand Up @@ -118,6 +117,7 @@ def run(self) -> None:
Event.end_time == None
).execute()
self.event_receiver.stop()
self.event_end_publisher.stop()
logger.info("Exiting event processor...")

def handle_object_detection(
Expand Down Expand Up @@ -242,7 +242,7 @@ def handle_object_detection(

if event_type == EventStateEnum.end:
del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera))
self.event_end_publisher.publish((event_data["id"], camera))

def handle_external_detection(
self, event_type: EventStateEnum, event_data: Event
Expand Down
18 changes: 12 additions & 6 deletions frigate/object_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import threading
from collections import Counter, defaultdict
from multiprocessing.synchronize import Event as MpEvent
from statistics import median
from typing import Callable

Expand All @@ -14,7 +15,7 @@

from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.comms.events_updater import EventEndSubscriber, EventUpdatePublisher
from frigate.config import (
CameraConfig,
FrigateConfig,
Expand Down Expand Up @@ -827,7 +828,6 @@ def __init__(
config: FrigateConfig,
dispatcher: Dispatcher,
tracked_objects_queue,
event_processed_queue,
ptz_autotracker_thread,
stop_event,
):
Expand All @@ -836,14 +836,14 @@ def __init__(
self.config = config
self.dispatcher = dispatcher
self.tracked_objects_queue = tracked_objects_queue
self.event_processed_queue = event_processed_queue
self.stop_event = stop_event
self.stop_event: MpEvent = stop_event
self.camera_states: dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager()
self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber()

def start(camera, obj: TrackedObject, current_frame_time):
self.event_sender.publish(
Expand Down Expand Up @@ -1215,10 +1215,16 @@ def run(self):
)

# cleanup event finished queue
while not self.event_processed_queue.empty():
event_id, camera = self.event_processed_queue.get()
while not self.stop_event.is_set():
update = self.event_end_subscriber.check_for_update(timeout=0.01)

if not update:
break

event_id, camera = update
self.camera_states[camera].finished(event_id)

self.detection_publisher.stop()
self.event_sender.stop()
self.event_end_subscriber.stop()
logger.info("Exiting object processor...")
Loading