diff --git a/docker/main/Dockerfile b/docker/main/Dockerfile index 1b10585738..75f5b8ed80 100644 --- a/docker/main/Dockerfile +++ b/docker/main/Dockerfile @@ -114,6 +114,8 @@ RUN apt-get -qq update \ gfortran openexr libatlas-base-dev libssl-dev\ libtbb2 libtbb-dev libdc1394-22-dev libopenexr-dev \ libgstreamer-plugins-base1.0-dev libgstreamer1.0-dev \ + # sqlite3 dependencies + tclsh \ # scipy dependencies gcc gfortran libopenblas-dev liblapack-dev && \ rm -rf /var/lib/apt/lists/* @@ -127,6 +129,10 @@ RUN wget -q https://bootstrap.pypa.io/get-pip.py -O get-pip.py \ COPY docker/main/requirements.txt /requirements.txt RUN pip3 install -r /requirements.txt +# Build pysqlite3 from source to support ChromaDB +COPY docker/main/build_pysqlite3.sh /build_pysqlite3.sh +RUN /build_pysqlite3.sh + COPY docker/main/requirements-wheels.txt /requirements-wheels.txt RUN pip3 wheel --wheel-dir=/wheels -r /requirements-wheels.txt @@ -153,6 +159,13 @@ ARG APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=DontWarn ENV NVIDIA_VISIBLE_DEVICES=all ENV NVIDIA_DRIVER_CAPABILITIES="compute,video,utility" +# Turn off Chroma Telemetry: https://docs.trychroma.com/telemetry#opting-out +ENV ANONYMIZED_TELEMETRY=False +# Allow resetting the chroma database +ENV ALLOW_RESET=True +# Disable tokenizer parallelism warning +ENV TOKENIZERS_PARALLELISM=true + ENV PATH="/usr/lib/btbn-ffmpeg/bin:/usr/local/go2rtc/bin:/usr/local/tempio/bin:/usr/local/nginx/sbin:${PATH}" # Install dependencies diff --git a/docker/main/build_pysqlite3.sh b/docker/main/build_pysqlite3.sh new file mode 100755 index 0000000000..6375b33fa2 --- /dev/null +++ b/docker/main/build_pysqlite3.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +set -euxo pipefail + +SQLITE3_VERSION="96c92aba00c8375bc32fafcdf12429c58bd8aabfcadab6683e35bbb9cdebf19e" # 3.46.0 +PYSQLITE3_VERSION="0.5.3" + +# Fetch the source code for the latest release of Sqlite. +if [[ ! -d "sqlite" ]]; then + wget https://www.sqlite.org/src/tarball/sqlite.tar.gz?r=${SQLITE3_VERSION} -O sqlite.tar.gz + tar xzf sqlite.tar.gz + cd sqlite/ + LIBS="-lm" ./configure --disable-tcl --enable-tempstore=always + make sqlite3.c + cd ../ + rm sqlite.tar.gz +fi + +# Grab the pysqlite3 source code. +if [[ ! -d "./pysqlite3" ]]; then + git clone https://github.com/coleifer/pysqlite3.git +fi + +cd pysqlite3/ +git checkout ${PYSQLITE3_VERSION} + +# Copy the sqlite3 source amalgamation into the pysqlite3 directory so we can +# create a self-contained extension module. +cp "../sqlite/sqlite3.c" ./ +cp "../sqlite/sqlite3.h" ./ + +# Create the wheel and put it in the /wheels dir. +sed -i "s|name='pysqlite3-binary'|name=PACKAGE_NAME|g" setup.py +python3 setup.py build_static +pip3 wheel . -w /wheels diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index 84c5a867c1..4b4e13850e 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -30,3 +30,10 @@ ws4py == 0.5.* unidecode == 1.3.* onnxruntime == 1.18.* openvino == 2024.1.* +# Embeddings +onnx_clip == 4.0.* +chromadb == 0.5.0 +# Generative AI +google-generativeai == 0.6.* +ollama == 0.2.* +openai == 1.30.* \ No newline at end of file diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for new file mode 100644 index 0000000000..4b935d3cb5 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for @@ -0,0 +1 @@ +chroma \ No newline at end of file diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/dependencies.d/log-prepare b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/dependencies.d/log-prepare new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name new file mode 100644 index 0000000000..71256e9ed9 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name @@ -0,0 +1 @@ +chroma-pipeline \ No newline at end of file diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run new file mode 100755 index 0000000000..2e47fd3ebe --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run @@ -0,0 +1,4 @@ +#!/command/with-contenv bash +# shellcheck shell=bash + +exec logutil-service /dev/shm/logs/chroma diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type new file mode 100644 index 0000000000..5883cff0cd --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type @@ -0,0 +1 @@ +longrun diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/dependencies.d/base b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/dependencies.d/base new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish new file mode 100644 index 0000000000..b6206b4ccf --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish @@ -0,0 +1,28 @@ +#!/command/with-contenv bash +# shellcheck shell=bash +# Take down the S6 supervision tree when the service exits + +set -o errexit -o nounset -o pipefail + +# Logs should be sent to stdout so that s6 can collect them + +declare exit_code_container +exit_code_container=$(cat /run/s6-linux-init-container-results/exitcode) +readonly exit_code_container +readonly exit_code_service="${1}" +readonly exit_code_signal="${2}" +readonly service="ChromaDB" + +echo "[INFO] Service ${service} exited with code ${exit_code_service} (by signal ${exit_code_signal})" + +if [[ "${exit_code_service}" -eq 256 ]]; then + if [[ "${exit_code_container}" -eq 0 ]]; then + echo $((128 + exit_code_signal)) >/run/s6-linux-init-container-results/exitcode + fi +elif [[ "${exit_code_service}" -ne 0 ]]; then + if [[ "${exit_code_container}" -eq 0 ]]; then + echo "${exit_code_service}" >/run/s6-linux-init-container-results/exitcode + fi +fi + +exec /run/s6/basedir/bin/halt diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for new file mode 100644 index 0000000000..c17b71e87a --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for @@ -0,0 +1 @@ +chroma-log diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run new file mode 100644 index 0000000000..bf28a56b4d --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run @@ -0,0 +1,16 @@ +#!/command/with-contenv bash +# shellcheck shell=bash +# Start the Frigate service + +set -o errexit -o nounset -o pipefail + +# Logs should be sent to stdout so that s6 can collect them + +# Tell S6-Overlay not to restart this service +s6-svc -O . + +echo "[INFO] Starting ChromaDB..." + +# Replace the bash process with the Frigate process, redirecting stderr to stdout +exec 2>&1 +exec /usr/local/chroma run --path /config/chroma --host 0.0.0.0 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill new file mode 100644 index 0000000000..6f4f418441 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill @@ -0,0 +1 @@ +120000 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type new file mode 100644 index 0000000000..5883cff0cd --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type @@ -0,0 +1 @@ +longrun diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/frigate/dependencies.d/chroma b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/frigate/dependencies.d/chroma new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run index c493e320ee..0661f01c2f 100755 --- a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run @@ -4,7 +4,7 @@ set -o errexit -o nounset -o pipefail -dirs=(/dev/shm/logs/frigate /dev/shm/logs/go2rtc /dev/shm/logs/nginx /dev/shm/logs/certsync) +dirs=(/dev/shm/logs/frigate /dev/shm/logs/go2rtc /dev/shm/logs/nginx /dev/shm/logs/certsync /dev/shm/logs/chroma) mkdir -p "${dirs[@]}" chown nobody:nogroup "${dirs[@]}" diff --git a/docker/main/rootfs/usr/local/chroma b/docker/main/rootfs/usr/local/chroma new file mode 100755 index 0000000000..5147db3877 --- /dev/null +++ b/docker/main/rootfs/usr/local/chroma @@ -0,0 +1,14 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*-s +__import__("pysqlite3") + +import re +import sys + +sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") + +from chromadb.cli.cli import app + +if __name__ == "__main__": + sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0]) + sys.exit(app()) diff --git a/docs/docs/configuration/genai.md b/docs/docs/configuration/genai.md new file mode 100644 index 0000000000..ddd2e9e2ae --- /dev/null +++ b/docs/docs/configuration/genai.md @@ -0,0 +1,135 @@ +--- +id: genai +title: Generative AI +--- + +Generative AI can be used to automatically generate descriptions based on the thumbnails of your events. This helps with [semantic search](/configuration/semantic_search) in Frigate by providing detailed text descriptions as a basis of the search query. + +## Configuration + +Generative AI can be enabled for all cameras or only for specific cameras. There are currently 3 providers available to integrate with Frigate. + +If the provider you choose requires an API key, you may either directly paste it in your configuration, or store it in an environment variable prefixed with `FRIGATE_`. + +```yaml +genai: + enabled: True + provider: gemini + api_key: "{FRIGATE_GEMINI_API_KEY}" + model: gemini-1.5-flash + +cameras: + front_camera: ... + indoor_camera: + genai: # <- disable GenAI for your indoor camera + enabled: False +``` + +## Ollama + +[Ollama](https://ollama.com/) allows you to self-host large language models and keep everything running locally. It provides a nice API over [llama.cpp](https://github.com/ggerganov/llama.cpp). It is highly recommended to host this server on a machine with an Nvidia graphics card, or on a Apple silicon Mac for best performance. Most of the 7b parameter 4-bit vision models will fit inside 8GB of VRAM. There is also a [docker container](https://hub.docker.com/r/ollama/ollama) available. + +### Supported Models + +You must use a vision capable model with Frigate. Current model variants can be found [in their model library](https://ollama.com/library). At the time of writing, this includes `llava`, `llava-llama3`, `llava-phi3`, and `moondream`. + +:::note + +You should have at least 8 GB of RAM available (or VRAM if running on GPU) to run the 7B models, 16 GB to run the 13B models, and 32 GB to run the 33B models. + +::: + +### Configuration + +```yaml +genai: + enabled: True + provider: ollama + base_url: http://localhost:11434 + model: llava +``` + +## Google Gemini + +Google Gemini has a free tier allowing [15 queries per minute](https://ai.google.dev/pricing) to the API, which is more than sufficient for standard Frigate usage. + +### Supported Models + +You must use a vision capable model with Frigate. Current model variants can be found [in their documentation](https://ai.google.dev/gemini-api/docs/models/gemini). At the time of writing, this includes `gemini-1.5-pro` and `gemini-1.5-flash`. + +### Get API Key + +To start using Gemini, you must first get an API key from [Google AI Studio](https://aistudio.google.com). + +1. Accept the Terms of Service +2. Click "Get API Key" from the right hand navigation +3. Click "Create API key in new project" +4. Copy the API key for use in your config + +### Configuration + +```yaml +genai: + enabled: True + provider: gemini + api_key: "{FRIGATE_GEMINI_API_KEY}" + model: gemini-1.5-flash +``` + +## OpenAI + +OpenAI does not have a free tier for their API. With the release of gpt-4o, pricing has been reduced and each generation should cost fractions of a cent if you choose to go this route. + +### Supported Models + +You must use a vision capable model with Frigate. Current model variants can be found [in their documentation](https://platform.openai.com/docs/models). At the time of writing, this includes `gpt-4o` and `gpt-4-turbo`. + +### Get API Key + +To start using OpenAI, you must first [create an API key](https://platform.openai.com/api-keys) and [configure billing](https://platform.openai.com/settings/organization/billing/overview). + +### Configuration + +```yaml +genai: + enabled: True + provider: openai + api_key: "{FRIGATE_OPENAI_API_KEY}" + model: gpt-4o +``` + +## Custom Prompts + +Frigate sends multiple frames from the detection along with a prompt to your Generative AI provider asking it to generate a description. The default prompt is as follows: + +``` +Describe the {label} in the sequence of images with as much detail as possible. Do not describe the background. +``` + +:::tip + +Prompts can use variable replacements like `{label}`, `{sub_label}`, and `{camera}` to substitute information from the detection as part of the prompt. + +::: + +You are also able to define custom prompts in your configuration. + +```yaml +genai: + enabled: True + provider: ollama + base_url: http://localhost:11434 + model: llava + prompt: "Describe the {label} in these images from the {camera} security camera." + object_prompts: + person: "Describe the main person in these images (gender, age, clothing, activity, etc). Do not include where the activity is occurring (sidewalk, concrete, driveway, etc). If delivering a package, include the company the package is from." + car: "Label the primary vehicle in these images with just the name of the company if it is a delivery vehicle, or the color make and model." +``` + +### Experiment with prompts + +Providers also has a public facing chat interface for their models. Download a couple different thumbnails or snapshots from Frigate and try new things in the playground to get descriptions to your liking before updating the prompt in Frigate. + +- OpenAI - [ChatGPT](https://chatgpt.com) +- Gemini - [Google AI Studio](https://aistudio.google.com) +- Ollama - [Open WebUI](https://docs.openwebui.com/) diff --git a/docs/docs/configuration/index.md b/docs/docs/configuration/index.md index d1e382e40d..01fe97530c 100644 --- a/docs/docs/configuration/index.md +++ b/docs/docs/configuration/index.md @@ -56,6 +56,11 @@ go2rtc: password: "{FRIGATE_GO2RTC_RTSP_PASSWORD}" ``` +```yaml +genai: + api_key: "{FRIGATE_GENAI_API_KEY}" +``` + ## Common configuration examples Here are some common starter configuration examples. Refer to the [reference config](./reference.md) for detailed information about all the config values. diff --git a/docs/docs/configuration/reference.md b/docs/docs/configuration/reference.md index a90a3241e2..fa94c98aa3 100644 --- a/docs/docs/configuration/reference.md +++ b/docs/docs/configuration/reference.md @@ -465,6 +465,35 @@ snapshots: # Optional: quality of the encoded jpeg, 0-100 (default: shown below) quality: 70 +# Optional: Configuration for semantic search capability +semantic_search: + # Optional: Enable semantic search (default: shown below) + enabled: False + # Optional: Re-index embeddings database from historical events (default: shown below) + reindex: False + +# Optional: Configuration for AI generated event descriptions +# NOTE: Semantic Search must be enabled for this to do anything. +# WARNING: Depending on the provider, this will send thumbnails over the internet +# to Google or OpenAI's LLMs to generate descriptions. It can be overridden at +# the camera level (enabled: False) to enhance privacy for indoor cameras. +genai: + # Optional: Enable Google Gemini description generation (default: shown below) + enabled: False + # Required if enabled: Provider must be one of ollama, gemini, or openai + provider: ollama + # Required if provider is ollama. May also be used for an OpenAI API compatible backend with the openai provider. + base_url: http://localhost::11434 + # Required if gemini or openai + api_key: "{FRIGATE_GENAI_API_KEY}" + # Optional: The default prompt for generating descriptions. Can use replacement + # variables like "label", "sub_label", "camera" to make more dynamic. (default: shown below) + prompt: "Describe the {label} in the sequence of images with as much detail as possible. Do not describe the background." + # Optional: Object specific prompts to customize description results + # Format: {label}: {prompt} + object_prompts: + person: "My special person prompt." + # Optional: Restream configuration # Uses https://github.com/AlexxIT/go2rtc (v1.8.3) go2rtc: diff --git a/docs/docs/configuration/semantic_search.md b/docs/docs/configuration/semantic_search.md new file mode 100644 index 0000000000..d8ab61d413 --- /dev/null +++ b/docs/docs/configuration/semantic_search.md @@ -0,0 +1,38 @@ +--- +id: semantic_search +title: Using Semantic Search +--- + +Semantic search works by embedding images and/or text into a vector representation identified by numbers. Frigate has support for two such models which both run locally: [OpenAI CLIP](https://openai.com/research/clip) and [all-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2). Embeddings are then saved to a local instance of [ChromaDB](https://trychroma.com). + +## Configuration + +Semantic Search is a global configuration setting. + +```yaml +semantic_search: + enabled: True + reindex: False +``` + +:::tip + +The embeddings database can be re-indexed from the existing detections in your database by adding `reindex: True` to your `semantic_search` configuration. Depending on the number of detections you have, it can take up to 30 minutes to complete and may max out your CPU while indexing. Make sure to set the config back to `False` before restarting Frigate again. + +::: + +### OpenAI CLIP + +This model is able to embed both images and text into the same vector space, which allows `image -> image` and `text -> image` similarity searches. Frigate uses this model on detections to encode the thumbnail image and store it in Chroma. When searching detections via text in the search box, frigate will perform a `text -> image` similarity search against this embedding. When clicking "FIND SIMILAR" next to a detection, Frigate will perform an `image -> image` similarity search to retrieve the closest matching thumbnails. + +### all-MiniLM-L6-v2 + +This is a sentence embedding model that has been fine tuned on over 1 billion sentence pairs. This model is used to embed detection descriptions and perform searches against them. Descriptions can be created and/or modified on the search page when clicking on the info icon next to a detection. See [the Generative AI docs](/configuration/genai.md) for more information on how to automatically generate event descriptions. + +## Usage Tips + +1. Semantic search is used in conjunction with the other filters available on the search page. Use a combination of traditional filtering and semantic search for the best results. +2. The comparison between text and image embedding distances generally means that results matching `description` will appear first, even if a `thumbnail` embedding may be a better match. Play with the "Search Type" filter to help find what you are looking for. +3. Make your search language and tone closely match your descriptions. If you are using thumbnail search, phrase your query as an image caption. +4. Semantic search on thumbnails tends to return better results when matching large subjects that take up most of the frame. Small things like "cat" tend to not work well. +5. Experiment! Find a detection you want to test and start typing keywords to see what works for you. diff --git a/docs/sidebars.js b/docs/sidebars.js index da41564cad..1e1a270464 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -29,6 +29,10 @@ module.exports = { "configuration/object_detectors", "configuration/audio_detectors", ], + "Semantic Search": [ + "configuration/semantic_search", + "configuration/genai", + ], Cameras: [ "configuration/cameras", "configuration/review", diff --git a/frigate/__main__.py b/frigate/__main__.py index 8442069082..7106f02097 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,9 +1,12 @@ import faulthandler +import sys import threading from flask import cli -from frigate.app import FrigateApp +# Hotsawp the sqlite3 module for Chroma compatibility +__import__("pysqlite3") +sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") faulthandler.enable() @@ -12,6 +15,8 @@ cli.show_server_banner = lambda *x: None if __name__ == "__main__": + from frigate.app import FrigateApp + frigate_app = FrigateApp() frigate_app.start() diff --git a/frigate/api/app.py b/frigate/api/app.py index 139b10d5b6..5fec51c03b 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -454,6 +454,7 @@ def logs(service: str): "frigate": "/dev/shm/logs/frigate/current", "go2rtc": "/dev/shm/logs/go2rtc/current", "nginx": "/dev/shm/logs/nginx/current", + "chroma": "/dev/shm/logs/chroma/current", } service_location = log_locations.get(service) diff --git a/frigate/app.py b/frigate/app.py index 7e845f44ab..840686f0a7 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -22,11 +22,11 @@ from frigate.api.app import create_app from frigate.api.auth import hash_password from frigate.comms.config_updater import ConfigPublisher -from frigate.comms.detections_updater import DetectionProxy from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient from frigate.comms.ws import WebSocketClient +from frigate.comms.zmq_proxy import ZmqProxy from frigate.config import FrigateConfig from frigate.const import ( CACHE_DIR, @@ -37,6 +37,8 @@ MODEL_CACHE_DIR, RECORD_DIR, ) +from frigate.embeddings import manage_embeddings +from frigate.embeddings.embeddings import Embeddings from frigate.events.audio import listen_to_audio from frigate.events.cleanup import EventCleanup from frigate.events.external import ExternalEventProcessor @@ -316,7 +318,21 @@ def init_review_segment_manager(self) -> None: self.review_segment_process = review_segment_process review_segment_process.start() self.processes["review_segment"] = review_segment_process.pid or 0 - logger.info(f"Recording process started: {review_segment_process.pid}") + logger.info(f"Review process started: {review_segment_process.pid}") + + def init_embeddings_manager(self) -> None: + # Create a client for other processes to use + self.embeddings = Embeddings() + embedding_process = mp.Process( + target=manage_embeddings, + name="embeddings_manager", + args=(self.config,), + ) + embedding_process.daemon = True + self.embedding_process = embedding_process + embedding_process.start() + self.processes["embeddings"] = embedding_process.pid or 0 + logger.info(f"Embedding process started: {embedding_process.pid}") def bind_database(self) -> None: """Bind db to the main process.""" @@ -362,7 +378,7 @@ def init_external_event_processor(self) -> None: def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() self.inter_config_updater = ConfigPublisher() - self.inter_detection_proxy = DetectionProxy() + self.inter_zmq_proxy = ZmqProxy() def init_web_server(self) -> None: self.flask_app = create_app( @@ -678,6 +694,7 @@ def start(self) -> None: self.init_onvif() self.init_recording_manager() self.init_review_segment_manager() + self.init_embeddings_manager() self.init_go2rtc() self.bind_database() self.check_db_data_migrations() @@ -797,7 +814,7 @@ def stop(self) -> None: # Stop Communicators self.inter_process_communicator.stop() self.inter_config_updater.stop() - self.inter_detection_proxy.stop() + self.inter_zmq_proxy.stop() while len(self.detection_shms) > 0: shm = self.detection_shms.pop() diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index af7b7b65d5..a60bd0699a 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -1,14 +1,9 @@ """Facilitates communication between processes.""" -import threading from enum import Enum from typing import Optional -import zmq - -SOCKET_CONTROL = "inproc://control.detections_updater" -SOCKET_PUB = "ipc:///tmp/cache/detect_pub" -SOCKET_SUB = "ipc:///tmp/cache/detect_sub" +from .zmq_proxy import Publisher, Subscriber class DetectionTypeEnum(str, Enum): @@ -18,85 +13,31 @@ class DetectionTypeEnum(str, Enum): audio = "audio" -class DetectionProxyRunner(threading.Thread): - def __init__(self, context: zmq.Context[zmq.Socket]) -> None: - threading.Thread.__init__(self) - self.name = "detection_proxy" - self.context = context - - def run(self) -> None: - """Run the proxy.""" - control = self.context.socket(zmq.REP) - control.connect(SOCKET_CONTROL) - incoming = self.context.socket(zmq.XSUB) - incoming.bind(SOCKET_PUB) - outgoing = self.context.socket(zmq.XPUB) - outgoing.bind(SOCKET_SUB) - - zmq.proxy_steerable( - incoming, outgoing, None, control - ) # blocking, will unblock terminate message is received - incoming.close() - outgoing.close() - - -class DetectionProxy: - """Proxies video and audio detections.""" - - def __init__(self) -> None: - self.context = zmq.Context() - self.control = self.context.socket(zmq.REQ) - self.control.bind(SOCKET_CONTROL) - self.runner = DetectionProxyRunner(self.context) - self.runner.start() - - def stop(self) -> None: - self.control.send("TERMINATE".encode()) # tell the proxy to stop - self.runner.join() - self.context.destroy() - - -class DetectionPublisher: +class DetectionPublisher(Publisher): """Simplifies receiving video and audio detections.""" - def __init__(self, topic: DetectionTypeEnum) -> None: - self.topic = topic - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUB) - self.socket.connect(SOCKET_PUB) - - def send_data(self, payload: any) -> None: - """Publish detection.""" - self.socket.send_string(self.topic.value, flags=zmq.SNDMORE) - self.socket.send_json(payload) - - def stop(self) -> None: - self.socket.close() - self.context.destroy() - - -class DetectionSubscriber: - """Simplifies receiving video and audio detections.""" + topic_base = "detection/" def __init__(self, topic: DetectionTypeEnum) -> None: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value) - self.socket.connect(SOCKET_SUB) + topic = topic.value + super().__init__(topic) - def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]: - """Returns detections or None if no update.""" - try: - has_update, _, _ = zmq.select([self.socket], [], [], timeout) - if has_update: - topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] - return (topic, self.socket.recv_json()) - except zmq.ZMQError: - pass +class DetectionSubscriber(Subscriber): + """Simplifies receiving video and audio detections.""" - return (None, None) + topic_base = "detection/" - def stop(self) -> None: - self.socket.close() - self.context.destroy() + def __init__(self, topic: DetectionTypeEnum) -> None: + topic = topic.value + super().__init__(topic) + + def check_for_update( + self, timeout: float = None + ) -> Optional[tuple[DetectionTypeEnum, any]]: + return super().check_for_update(timeout) + + def _return_object(self, topic: str, payload: any) -> any: + if payload is None: + return (None, None) + return (DetectionTypeEnum[topic[len(self.topic_base) :]], payload) diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index db6c44c110..2369a9a3c0 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -14,9 +14,10 @@ INSERT_PREVIEW, REQUEST_REGION_GRID, UPDATE_CAMERA_ACTIVITY, + UPDATE_EVENT_DESCRIPTION, UPSERT_REVIEW_SEGMENT, ) -from frigate.models import Previews, Recordings, ReviewSegment +from frigate.models import Event, Previews, Recordings, ReviewSegment from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.types import PTZMetricsTypes from frigate.util.object import get_camera_regions_grid @@ -128,6 +129,10 @@ def _receive(self, topic: str, payload: str) -> Optional[Any]: ).execute() elif topic == UPDATE_CAMERA_ACTIVITY: self.camera_activity = payload + elif topic == UPDATE_EVENT_DESCRIPTION: + event: Event = Event.get(Event.id == payload["id"]) + event.data["description"] = payload["description"] + event.save() elif topic == "onConnect": self.publish("camera_activity", json.dumps(self.camera_activity)) else: diff --git a/frigate/comms/events_updater.py b/frigate/comms/events_updater.py index dd8caf8a30..7a5772273d 100644 --- a/frigate/comms/events_updater.py +++ b/frigate/comms/events_updater.py @@ -1,100 +1,51 @@ """Facilitates communication between processes.""" -import zmq - from frigate.events.types import EventStateEnum, EventTypeEnum -SOCKET_PUSH_PULL = "ipc:///tmp/cache/events" -SOCKET_PUSH_PULL_END = "ipc:///tmp/cache/events_ended" +from .zmq_proxy import Publisher, Subscriber -class EventUpdatePublisher: +class EventUpdatePublisher(Publisher): """Publishes events (objects, audio, manual).""" + topic_base = "event/" + def __init__(self) -> None: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUSH) - self.socket.connect(SOCKET_PUSH_PULL) + super().__init__("update") def publish( self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] ) -> None: - """There is no communication back to the processes.""" - self.socket.send_json(payload) - - def stop(self) -> None: - self.socket.close() - self.context.destroy() + super().publish(payload) -class EventUpdateSubscriber: +class EventUpdateSubscriber(Subscriber): """Receives event updates.""" - def __init__(self) -> None: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PULL) - self.socket.bind(SOCKET_PUSH_PULL) - - def check_for_update( - self, timeout=1 - ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: - """Returns events or None if no update.""" - try: - has_update, _, _ = zmq.select([self.socket], [], [], timeout) - - if has_update: - return self.socket.recv_json() - except zmq.ZMQError: - pass + topic_base = "event/" - return None - - def stop(self) -> None: - self.socket.close() - self.context.destroy() + def __init__(self) -> None: + super().__init__("update") -class EventEndPublisher: +class EventEndPublisher(Publisher): """Publishes events that have ended.""" + topic_base = "event/" + def __init__(self) -> None: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUSH) - self.socket.connect(SOCKET_PUSH_PULL_END) + super().__init__("finalized") def publish( self, payload: tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]] ) -> None: - """There is no communication back to the processes.""" - self.socket.send_json(payload) + super().publish(payload) - def stop(self) -> None: - self.socket.close() - self.context.destroy() - -class EventEndSubscriber: +class EventEndSubscriber(Subscriber): """Receives events that have ended.""" + topic_base = "event/" + def __init__(self) -> None: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PULL) - self.socket.bind(SOCKET_PUSH_PULL_END) - - def check_for_update( - self, timeout=1 - ) -> tuple[EventTypeEnum, EventStateEnum, str, dict[str, any]]: - """Returns events ended or None if no update.""" - try: - has_update, _, _ = zmq.select([self.socket], [], [], timeout) - - if has_update: - return self.socket.recv_json() - except zmq.ZMQError: - pass - - return None - - def stop(self) -> None: - self.socket.close() - self.context.destroy() + super().__init__("finalized") diff --git a/frigate/comms/zmq_proxy.py b/frigate/comms/zmq_proxy.py new file mode 100644 index 0000000000..b6012966f1 --- /dev/null +++ b/frigate/comms/zmq_proxy.py @@ -0,0 +1,100 @@ +"""Facilitates communication over zmq proxy.""" + +import threading +from typing import Optional + +import zmq + +SOCKET_PUB = "ipc:///tmp/cache/proxy_pub" +SOCKET_SUB = "ipc:///tmp/cache/proxy_sub" + + +class ZmqProxyRunner(threading.Thread): + def __init__(self, context: zmq.Context[zmq.Socket]) -> None: + threading.Thread.__init__(self) + self.name = "detection_proxy" + self.context = context + + def run(self) -> None: + """Run the proxy.""" + incoming = self.context.socket(zmq.XSUB) + incoming.bind(SOCKET_PUB) + outgoing = self.context.socket(zmq.XPUB) + outgoing.bind(SOCKET_SUB) + + # Blocking: This will unblock (via exception) when we destroy the context + # The incoming and outgoing sockets will be closed automatically + # when the context is destroyed as well. + try: + zmq.proxy(incoming, outgoing) + except zmq.ZMQError: + pass + + +class ZmqProxy: + """Proxies video and audio detections.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.runner = ZmqProxyRunner(self.context) + self.runner.start() + + def stop(self) -> None: + # destroying the context will tell the proxy to stop + self.context.destroy() + self.runner.join() + + +class Publisher: + """Publishes messages.""" + + topic_base: str = "" + + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(SOCKET_PUB) + + def publish(self, payload: any, sub_topic: str = "") -> None: + """Publish message.""" + self.socket.send_string(f"{self.topic}{sub_topic}", flags=zmq.SNDMORE) + self.socket.send_json(payload) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class Subscriber: + """Receives messages.""" + + topic_base: str = "" + + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic) + self.socket.connect(SOCKET_SUB) + + def check_for_update(self, timeout: float = 1) -> Optional[tuple[str, any]]: + """Returns message or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + topic = self.socket.recv_string(flags=zmq.NOBLOCK) + payload = self.socket.recv_json() + return self._return_object(topic, payload) + except zmq.ZMQError: + pass + + return self._return_object("", None) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + def _return_object(self, topic: str, payload: any) -> any: + return payload diff --git a/frigate/config.py b/frigate/config.py index c62ccd9f1b..20a540919c 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -730,6 +730,38 @@ class ReviewConfig(FrigateBaseModel): ) +class SemanticSearchConfig(FrigateBaseModel): + enabled: bool = Field(default=True, title="Enable semantic search.") + reindex: Optional[bool] = Field( + default=False, title="Reindex all detections on startup." + ) + + +class GenAIProviderEnum(str, Enum): + openai = "openai" + gemini = "gemini" + ollama = "ollama" + + +class GenAIConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable GenAI.") + provider: GenAIProviderEnum = Field( + default=GenAIProviderEnum.openai, title="GenAI provider." + ) + base_url: Optional[str] = Field(None, title="Provider base url.") + api_key: Optional[str] = Field(None, title="Provider API key.") + model: str = Field(default="gpt-4o", title="GenAI model.") + prompt: str = Field( + default="Describe the {label} in the sequence of images with as much detail as possible. Do not describe the background.", + title="Default caption prompt.", + ) + object_prompts: Dict[str, str] = Field(default={}, title="Object specific prompts.") + + +class GenAICameraConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable GenAI for camera.") + + class AudioConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="Enable audio events.") max_not_heard: int = Field( @@ -1011,6 +1043,9 @@ class CameraConfig(FrigateBaseModel): review: ReviewConfig = Field( default_factory=ReviewConfig, title="Review configuration." ) + genai: GenAICameraConfig = Field( + default_factory=GenAICameraConfig, title="Generative AI configuration." + ) audio: AudioConfig = Field( default_factory=AudioConfig, title="Audio events configuration." ) @@ -1363,6 +1398,12 @@ class FrigateConfig(FrigateBaseModel): review: ReviewConfig = Field( default_factory=ReviewConfig, title="Review configuration." ) + semantic_search: SemanticSearchConfig = Field( + default_factory=SemanticSearchConfig, title="Semantic search configuration." + ) + genai: GenAIConfig = Field( + default_factory=GenAIConfig, title="Generative AI configuration." + ) audio: AudioConfig = Field( default_factory=AudioConfig, title="Global Audio events configuration." ) @@ -1397,6 +1438,10 @@ def runtime_config(self, plus_api: PlusApi = None) -> FrigateConfig: config.mqtt.user = config.mqtt.user.format(**FRIGATE_ENV_VARS) config.mqtt.password = config.mqtt.password.format(**FRIGATE_ENV_VARS) + # GenAI substitution + if config.genai.api_key: + config.genai.api_key = config.genai.api_key.format(**FRIGATE_ENV_VARS) + # set default min_score for object attributes for attribute in ALL_ATTRIBUTE_LABELS: if not config.objects.filters.get(attribute): @@ -1418,6 +1463,7 @@ def runtime_config(self, plus_api: PlusApi = None) -> FrigateConfig: "live": ..., "objects": ..., "review": ..., + "genai": {"enabled"}, "motion": ..., "detect": ..., "ffmpeg": ..., diff --git a/frigate/const.py b/frigate/const.py index b55187744c..7cdb2b6725 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -81,6 +81,7 @@ UPSERT_REVIEW_SEGMENT = "upsert_review_segment" CLEAR_ONGOING_REVIEW_SEGMENTS = "clear_ongoing_review_segments" UPDATE_CAMERA_ACTIVITY = "update_camera_activity" +UPDATE_EVENT_DESCRIPTION = "update_event_description" # Stats Values diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py new file mode 100644 index 0000000000..41af73c012 --- /dev/null +++ b/frigate/embeddings/__init__.py @@ -0,0 +1,67 @@ +"""ChromaDB embeddings database.""" + +import logging +import multiprocessing as mp +import signal +import sys +import threading +from types import FrameType +from typing import Optional + +from playhouse.sqliteq import SqliteQueueDatabase +from setproctitle import setproctitle + +from frigate.config import FrigateConfig +from frigate.models import Event +from frigate.util.services import listen + +logger = logging.getLogger(__name__) + + +def manage_embeddings(config: FrigateConfig) -> None: + # Only initialize embeddings if semantic search is enabled + if not config.semantic_search.enabled: + return + + stop_event = mp.Event() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + threading.current_thread().name = "process:embeddings_manager" + setproctitle("frigate.embeddings_manager") + listen() + + # Configure Frigate DB + db = SqliteQueueDatabase( + config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), + ) + models = [Event] + db.bind(models) + + # Hotsawp the sqlite3 module for Chroma compatibility + __import__("pysqlite3") + sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") + from .embeddings import Embeddings + from .maintainer import EmbeddingMaintainer + + embeddings = Embeddings() + + # Check if we need to re-index events + if config.semantic_search.reindex: + embeddings.reindex() + + maintainer = EmbeddingMaintainer( + config, + stop_event, + ) + maintainer.start() diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py new file mode 100644 index 0000000000..c7a688d120 --- /dev/null +++ b/frigate/embeddings/embeddings.py @@ -0,0 +1,122 @@ +"""ChromaDB embeddings database.""" + +import base64 +import io +import logging +import time + +import numpy as np +from chromadb import Collection +from chromadb import HttpClient as ChromaClient +from chromadb.config import Settings +from PIL import Image +from playhouse.shortcuts import model_to_dict + +from frigate.models import Event + +from .functions.clip import ClipEmbedding +from .functions.minilm_l6_v2 import MiniLMEmbedding + +logger = logging.getLogger(__name__) + + +def get_metadata(event: Event) -> dict: + """Extract valid event metadata.""" + event_dict = model_to_dict(event) + return ( + { + k: v + for k, v in event_dict.items() + if k not in ["id", "thumbnail"] + and v is not None + and isinstance(v, (str, int, float, bool)) + } + | { + k: v + for k, v in event_dict["data"].items() + if k not in ["description"] + and v is not None + and isinstance(v, (str, int, float, bool)) + } + | { + # Metadata search doesn't support $contains + # and an event can have multiple zones, so + # we need to create a key for each zone + f"{k}_{x}": True + for k, v in event_dict.items() + if isinstance(v, list) and len(v) > 0 + for x in v + if isinstance(x, str) + } + ) + + +class Embeddings: + """ChromaDB embeddings database.""" + + def __init__(self) -> None: + self.client: ChromaClient = ChromaClient( + host="127.0.0.1", + settings=Settings(anonymized_telemetry=False), + ) + + @property + def thumbnail(self) -> Collection: + return self.client.get_or_create_collection( + name="event_thumbnail", embedding_function=ClipEmbedding() + ) + + @property + def description(self) -> Collection: + return self.client.get_or_create_collection( + name="event_description", embedding_function=MiniLMEmbedding() + ) + + def reindex(self) -> None: + """Reindex all event embeddings.""" + logger.info("Indexing event embeddings...") + self.client.reset() + + st = time.time() + + thumbnails = {"ids": [], "images": [], "metadatas": []} + descriptions = {"ids": [], "documents": [], "metadatas": []} + + events = Event.select().where( + (Event.has_clip == True | Event.has_snapshot == True) + & Event.thumbnail.is_null(False) + ) + + event: Event + for event in events.iterator(): + metadata = get_metadata(event) + thumbnail = base64.b64decode(event.thumbnail) + img = np.array(Image.open(io.BytesIO(thumbnail)).convert("RGB")) + thumbnails["ids"].append(event.id) + thumbnails["images"].append(img) + thumbnails["metadatas"].append(metadata) + if event.data.get("description") is not None: + descriptions["ids"].append(event.id) + descriptions["documents"].append(event.data["description"]) + descriptions["metadatas"].append(metadata) + + if len(thumbnails["ids"]) > 0: + self.thumbnail.upsert( + images=thumbnails["images"], + metadatas=thumbnails["metadatas"], + ids=thumbnails["ids"], + ) + + if len(descriptions["ids"]) > 0: + self.description.upsert( + documents=descriptions["documents"], + metadatas=descriptions["metadatas"], + ids=descriptions["ids"], + ) + + logger.info( + "Embedded %d thumbnails and %d descriptions in %s seconds", + len(thumbnails["ids"]), + len(descriptions["ids"]), + time.time() - st, + ) diff --git a/frigate/embeddings/functions/clip.py b/frigate/embeddings/functions/clip.py new file mode 100644 index 0000000000..867938aff4 --- /dev/null +++ b/frigate/embeddings/functions/clip.py @@ -0,0 +1,63 @@ +"""CLIP Embeddings for Frigate.""" + +import os +from typing import Tuple, Union + +import onnxruntime as ort +from chromadb import EmbeddingFunction, Embeddings +from chromadb.api.types import ( + Documents, + Images, + is_document, + is_image, +) +from onnx_clip import OnnxClip + +from frigate.const import MODEL_CACHE_DIR + + +class Clip(OnnxClip): + """Override load models to download to cache directory.""" + + @staticmethod + def _load_models( + model: str, + silent: bool, + ) -> Tuple[ort.InferenceSession, ort.InferenceSession]: + """ + These models are a part of the container. Treat as as such. + """ + if model == "ViT-B/32": + IMAGE_MODEL_FILE = "clip_image_model_vitb32.onnx" + TEXT_MODEL_FILE = "clip_text_model_vitb32.onnx" + elif model == "RN50": + IMAGE_MODEL_FILE = "clip_image_model_rn50.onnx" + TEXT_MODEL_FILE = "clip_text_model_rn50.onnx" + else: + raise ValueError(f"Unexpected model {model}. No `.onnx` file found.") + + models = [] + for model_file in [IMAGE_MODEL_FILE, TEXT_MODEL_FILE]: + path = os.path.join(MODEL_CACHE_DIR, "clip", model_file) + models.append(OnnxClip._load_model(path, silent)) + + return models[0], models[1] + + +class ClipEmbedding(EmbeddingFunction): + """Embedding function for CLIP model used in Chroma.""" + + def __init__(self, model: str = "ViT-B/32"): + """Initialize CLIP Embedding function.""" + self.model = Clip(model) + + def __call__(self, input: Union[Documents, Images]) -> Embeddings: + embeddings: Embeddings = [] + for item in input: + if is_image(item): + result = self.model.get_image_embeddings([item]) + embeddings.append(result[0, :].tolist()) + elif is_document(item): + result = self.model.get_text_embeddings([item]) + embeddings.append(result[0, :].tolist()) + return embeddings diff --git a/frigate/embeddings/functions/minilm_l6_v2.py b/frigate/embeddings/functions/minilm_l6_v2.py new file mode 100644 index 0000000000..f90060fdb3 --- /dev/null +++ b/frigate/embeddings/functions/minilm_l6_v2.py @@ -0,0 +1,11 @@ +"""Embedding function for ONNX MiniLM-L6 model used in Chroma.""" + +from chromadb.utils.embedding_functions import ONNXMiniLM_L6_V2 + +from frigate.const import MODEL_CACHE_DIR + + +class MiniLMEmbedding(ONNXMiniLM_L6_V2): + """Override DOWNLOAD_PATH to download to cache directory.""" + + DOWNLOAD_PATH = f"{MODEL_CACHE_DIR}/all-MiniLM-L6-v2" diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py new file mode 100644 index 0000000000..aac85c01a6 --- /dev/null +++ b/frigate/embeddings/maintainer.py @@ -0,0 +1,197 @@ +"""Maintain embeddings in Chroma.""" + +import base64 +import io +import logging +import threading +from multiprocessing.synchronize import Event as MpEvent +from typing import Optional + +import cv2 +import numpy as np +from peewee import DoesNotExist +from PIL import Image + +from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.const import UPDATE_EVENT_DESCRIPTION +from frigate.events.types import EventTypeEnum +from frigate.genai import get_genai_client +from frigate.models import Event +from frigate.util.image import SharedMemoryFrameManager, calculate_region + +from .embeddings import Embeddings, get_metadata + +logger = logging.getLogger(__name__) + + +class EmbeddingMaintainer(threading.Thread): + """Handle embedding queue and post event updates.""" + + def __init__( + self, + config: FrigateConfig, + stop_event: MpEvent, + ) -> None: + threading.Thread.__init__(self) + self.name = "embeddings_maintainer" + self.config = config + self.embeddings = Embeddings() + self.event_subscriber = EventUpdateSubscriber() + self.event_end_subscriber = EventEndSubscriber() + self.frame_manager = SharedMemoryFrameManager() + # create communication for updating event descriptions + self.requestor = InterProcessRequestor() + self.stop_event = stop_event + self.tracked_events = {} + self.genai_client = get_genai_client(config.genai) + + def run(self) -> None: + """Maintain a Chroma vector database for semantic search.""" + while not self.stop_event.is_set(): + self._process_updates() + self._process_finalized() + + self.event_subscriber.stop() + self.event_end_subscriber.stop() + self.requestor.stop() + logger.info("Exiting embeddings maintenance...") + + def _process_updates(self) -> None: + """Process event updates""" + update = self.event_subscriber.check_for_update() + + if update is None: + return + + source_type, _, camera, data = update + + if not camera or source_type != EventTypeEnum.tracked_object: + return + + camera_config = self.config.cameras[camera] + if data["id"] not in self.tracked_events: + self.tracked_events[data["id"]] = [] + + # Create our own thumbnail based on the bounding box and the frame time + try: + frame_id = f"{camera}{data['frame_time']}" + yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv) + data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) + self.tracked_events[data["id"]].append(data) + self.frame_manager.close(frame_id) + except FileNotFoundError: + pass + + def _process_finalized(self) -> None: + """Process the end of an event.""" + while True: + ended = self.event_end_subscriber.check_for_update() + + if ended == None: + break + + event_id, camera, updated_db = ended + camera_config = self.config.cameras[camera] + + if updated_db: + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + continue + + # Skip the event if not an object + if event.data.get("type") != "object": + continue + + # Extract valid event metadata + metadata = get_metadata(event) + thumbnail = base64.b64decode(event.thumbnail) + + # Embed the thumbnail + self._embed_thumbnail(event_id, thumbnail, metadata) + + if ( + camera_config.genai.enabled + and self.genai_client is not None + and event.data.get("description") is None + ): + # Generate the description. Call happens in a thread since it is network bound. + threading.Thread( + target=self._embed_description, + name=f"_embed_description_{event.id}", + daemon=True, + args=( + event, + [ + data["thumbnail"] + for data in self.tracked_events[event_id] + ] + if len(self.tracked_events.get(event_id, [])) > 0 + else [thumbnail], + metadata, + ), + ).start() + + # Delete tracked events based on the event_id + if event_id in self.tracked_events: + del self.tracked_events[event_id] + + def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: + """Return jpg thumbnail of a region of the frame.""" + frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) + region = calculate_region( + frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4 + ) + frame = frame[region[1] : region[3], region[0] : region[2]] + width = int(height * frame.shape[1] / frame.shape[0]) + frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA) + ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100]) + + if ret: + return jpg.tobytes() + + return None + + def _embed_thumbnail(self, event_id: str, thumbnail: bytes, metadata: dict) -> None: + """Embed the thumbnail for an event.""" + + # Encode the thumbnail + img = np.array(Image.open(io.BytesIO(thumbnail)).convert("RGB")) + self.embeddings.thumbnail.upsert( + images=[img], + metadatas=[metadata], + ids=[event_id], + ) + + def _embed_description( + self, event: Event, thumbnails: list[bytes], metadata: dict + ) -> None: + """Embed the description for an event.""" + + description = self.genai_client.generate_description(thumbnails, metadata) + + if description is None: + logger.debug("Failed to generate description for %s", event.id) + return + + # fire and forget description update + self.requestor.send_data( + UPDATE_EVENT_DESCRIPTION, + {"id": event.id, "description": description}, + ) + + # Encode the description + self.embeddings.description.upsert( + documents=[description], + metadatas=[metadata], + ids=[event.id], + ) + + logger.debug( + "Generated description for %s (%d images): %s", + event.id, + len(thumbnails), + description, + ) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index fca16f364a..2d21c5797e 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -221,7 +221,7 @@ def detect_audio(self, audio) -> None: audio_detections.append(label) # send audio detection data - self.detection_publisher.send_data( + self.detection_publisher.publish( ( self.config.name, datetime.datetime.now().timestamp(), diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index 88b4f0de66..12b00f620b 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -10,6 +10,7 @@ from frigate.config import FrigateConfig from frigate.const import CLIPS_DIR +from frigate.embeddings.embeddings import Embeddings from frigate.models import Event, Timeline logger = logging.getLogger(__name__) @@ -26,6 +27,7 @@ def __init__(self, config: FrigateConfig, stop_event: MpEvent): self.name = "event_cleanup" self.config = config self.stop_event = stop_event + self.embeddings = Embeddings() self.camera_keys = list(self.config.cameras.keys()) self.removed_camera_labels: list[str] = None self.camera_labels: dict[str, dict[str, any]] = {} @@ -197,9 +199,20 @@ def run(self) -> None: self.expire(EventCleanupType.snapshots) # drop events from db where has_clip and has_snapshot are false - delete_query = Event.delete().where( - Event.has_clip == False, Event.has_snapshot == False + events = ( + Event.select() + .where(Event.has_clip == False, Event.has_snapshot == False) + .iterator() ) - delete_query.execute() + events_to_delete = [e.id for e in events] + if len(events_to_delete) > 0: + chunk_size = 50 + for i in range(0, len(events_to_delete), chunk_size): + chunk = events_to_delete[i : i + chunk_size] + Event.delete().where(Event.id << chunk).execute() + + if self.config.semantic_search.enabled: + self.embeddings.thumbnail.delete(ids=chunk) + self.embeddings.description.delete(ids=chunk) logger.info("Exiting event cleanup...") diff --git a/frigate/events/external.py b/frigate/events/external.py index 46ce6f12c5..00f7cee4f5 100644 --- a/frigate/events/external.py +++ b/frigate/events/external.py @@ -86,7 +86,7 @@ def create_manual_event( if source_type == "api": self.event_camera[event_id] = camera - self.detection_updater.send_data( + self.detection_updater.publish( ( camera, now, @@ -115,7 +115,7 @@ def finish_manual_event(self, event_id: str, end_time: float) -> None: ) if event_id in self.event_camera: - self.detection_updater.send_data( + self.detection_updater.publish( ( self.event_camera[event_id], end_time, diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 0ef3ffaaca..e83194ede3 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -237,7 +237,7 @@ def handle_object_detection( if event_type == EventStateEnum.end: del self.events_in_process[event_data["id"]] - self.event_end_publisher.publish((event_data["id"], camera)) + self.event_end_publisher.publish((event_data["id"], camera, updated_db)) def handle_external_detection( self, event_type: EventStateEnum, event_data: Event diff --git a/frigate/genai/__init__.py b/frigate/genai/__init__.py new file mode 100644 index 0000000000..3761fa62fc --- /dev/null +++ b/frigate/genai/__init__.py @@ -0,0 +1,63 @@ +"""Generative AI module for Frigate.""" + +import importlib +import os +from typing import Optional + +from frigate.config import GenAIConfig, GenAIProviderEnum + +PROVIDERS = {} + + +def register_genai_provider(key: GenAIProviderEnum): + """Register a GenAI provider.""" + + def decorator(cls): + PROVIDERS[key] = cls + return cls + + return decorator + + +class GenAIClient: + """Generative AI client for Frigate.""" + + def __init__(self, genai_config: GenAIConfig, timeout: int = 60) -> None: + self.genai_config: GenAIConfig = genai_config + self.timeout = timeout + self.provider = self._init_provider() + + def generate_description( + self, thumbnails: list[bytes], metadata: dict[str, any] + ) -> Optional[str]: + """Generate a description for the frame.""" + prompt = self.genai_config.object_prompts.get( + metadata["label"], self.genai_config.prompt + ).format(**metadata) + return self._send(prompt, thumbnails) + + def _init_provider(self): + """Initialize the client.""" + return None + + def _send(self, prompt: str, images: list[bytes]) -> Optional[str]: + """Submit a request to the provider.""" + return None + + +def get_genai_client(genai_config: GenAIConfig) -> Optional[GenAIClient]: + """Get the GenAI client.""" + if genai_config.enabled: + load_providers() + provider = PROVIDERS.get(genai_config.provider) + if provider: + return provider(genai_config) + return None + + +def load_providers(): + package_dir = os.path.dirname(__file__) + for filename in os.listdir(package_dir): + if filename.endswith(".py") and filename != "__init__.py": + module_name = f"frigate.genai.{filename[:-3]}" + importlib.import_module(module_name) diff --git a/frigate/genai/gemini.py b/frigate/genai/gemini.py new file mode 100644 index 0000000000..f5b7bd2df1 --- /dev/null +++ b/frigate/genai/gemini.py @@ -0,0 +1,49 @@ +"""Gemini Provider for Frigate AI.""" + +from typing import Optional + +import google.generativeai as genai +from google.api_core.exceptions import GoogleAPICallError + +from frigate.config import GenAIProviderEnum +from frigate.genai import GenAIClient, register_genai_provider + + +@register_genai_provider(GenAIProviderEnum.gemini) +class GeminiClient(GenAIClient): + """Generative AI client for Frigate using Gemini.""" + + provider: genai.GenerativeModel + + def _init_provider(self): + """Initialize the client.""" + genai.configure(api_key=self.genai_config.api_key) + return genai.GenerativeModel(self.genai_config.model) + + def _send(self, prompt: str, images: list[bytes]) -> Optional[str]: + """Submit a request to Gemini.""" + data = [ + { + "mime_type": "image/jpeg", + "data": img, + } + for img in images + ] + [prompt] + try: + response = self.provider.generate_content( + data, + generation_config=genai.types.GenerationConfig( + candidate_count=1, + ), + request_options=genai.types.RequestOptions( + timeout=self.timeout, + ), + ) + except GoogleAPICallError: + return None + try: + description = response.text.strip() + except ValueError: + # No description was generated + return None + return description diff --git a/frigate/genai/ollama.py b/frigate/genai/ollama.py new file mode 100644 index 0000000000..09bcad0c53 --- /dev/null +++ b/frigate/genai/ollama.py @@ -0,0 +1,41 @@ +"""Ollama Provider for Frigate AI.""" + +import logging +from typing import Optional + +from httpx import TimeoutException +from ollama import Client as ApiClient +from ollama import ResponseError + +from frigate.config import GenAIProviderEnum +from frigate.genai import GenAIClient, register_genai_provider + +logger = logging.getLogger(__name__) + + +@register_genai_provider(GenAIProviderEnum.ollama) +class OllamaClient(GenAIClient): + """Generative AI client for Frigate using Ollama.""" + + provider: ApiClient + + def _init_provider(self): + """Initialize the client.""" + client = ApiClient(host=self.genai_config.base_url, timeout=self.timeout) + response = client.pull(self.genai_config.model) + if response["status"] != "success": + logger.error("Failed to pull %s model from Ollama", self.genai_config.model) + return None + return client + + def _send(self, prompt: str, images: list[bytes]) -> Optional[str]: + """Submit a request to Ollama""" + try: + result = self.provider.generate( + self.genai_config.model, + prompt, + images=images, + ) + return result["response"].strip() + except (TimeoutException, ResponseError): + return None diff --git a/frigate/genai/openai.py b/frigate/genai/openai.py new file mode 100644 index 0000000000..d0178df8bc --- /dev/null +++ b/frigate/genai/openai.py @@ -0,0 +1,51 @@ +"""OpenAI Provider for Frigate AI.""" + +import base64 +from typing import Optional + +from httpx import TimeoutException +from openai import OpenAI + +from frigate.config import GenAIProviderEnum +from frigate.genai import GenAIClient, register_genai_provider + + +@register_genai_provider(GenAIProviderEnum.openai) +class OpenAIClient(GenAIClient): + """Generative AI client for Frigate using OpenAI.""" + + provider: OpenAI + + def _init_provider(self): + """Initialize the client.""" + return OpenAI(api_key=self.genai_config.api_key) + + def _send(self, prompt: str, images: list[bytes]) -> Optional[str]: + """Submit a request to OpenAI.""" + encoded_images = [base64.b64encode(image).decode("utf-8") for image in images] + try: + result = self.provider.chat.completions.create( + model=self.genai_config.model, + messages=[ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image}", + "detail": "low", + }, + } + for image in encoded_images + ] + + [prompt], + }, + ], + timeout=self.timeout, + ) + except TimeoutException: + return None + if len(result.choices) > 0: + return result.choices[0].message.content.strip() + return None diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 7ac0b7276c..dcf6014fcc 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -1187,7 +1187,7 @@ def run(self): ] # publish info on this frame - self.detection_publisher.send_data( + self.detection_publisher.publish( ( camera, frame_time, @@ -1274,7 +1274,7 @@ def run(self): if not update: break - event_id, camera = update + event_id, camera, _ = update self.camera_states[camera].finished(event_id) self.requestor.stop() diff --git a/frigate/output/output.py b/frigate/output/output.py index e458d3242e..e0e7d0cac0 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -80,7 +80,7 @@ def receiveSignal(signalNumber, frame): websocket_thread.start() while not stop_event.is_set(): - (topic, data) = detection_subscriber.get_data(timeout=1) + (topic, data) = detection_subscriber.check_for_update(timeout=1) if not topic: continue @@ -134,7 +134,7 @@ def receiveSignal(signalNumber, frame): move_preview_frames("clips") while True: - (topic, data) = detection_subscriber.get_data(timeout=0) + (topic, data) = detection_subscriber.check_for_update(timeout=0) if not topic: break diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 50ead905c4..2d12e2c320 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -470,7 +470,7 @@ def run(self) -> None: stale_frame_count_threshold = 10 # empty the object recordings info queue while True: - (topic, data) = self.detection_subscriber.get_data( + (topic, data) = self.detection_subscriber.check_for_update( timeout=QUEUE_READ_TIMEOUT ) diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 8fb1df362c..3e0a2b170b 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -424,7 +424,7 @@ def run(self) -> None: camera_name = updated_topic.rpartition("/")[-1] self.config.cameras[camera_name].record = updated_record_config - (topic, data) = self.detection_subscriber.get_data(timeout=1) + (topic, data) = self.detection_subscriber.check_for_update(timeout=1) if not topic: continue