Skip to content

Commit

Permalink
[ntfy] [Frigate] Synchronize JSON event and snapshot image receive order
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Apr 27, 2023
1 parent c891585 commit 2433cd4
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 49 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ in progress
- [ntfy] Add dedicated service plugin ``ntfy``
- [ntfy] Use RFC 2047 for encoding HTTP header values. Thanks, @binwiederhier.
- [ntfy] Add more fields: icon, cache, firebase, unifiedpush
- [ntfy] Improve example/tutorial about Frigate event notifications
- [ntfy] Also interpolate outbound ntfy option fields
- [ntfy] [Frigate] Improve example/tutorial about Frigate event notifications
- [ntfy] [Frigate] Synchronize JSON event and snapshot image receive order


2023-04-11 0.33.0
Expand Down
11 changes: 11 additions & 0 deletions examples/frigate/frigate.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ launch = ntfy, store-image

status_publish = True

# This scenario needs two workers, because it needs the headroom of two threads
# running in parallel, to synchronize _two_ distinct Frigate events with each other,
# in order to send out _one_ notification.
num_workers = 2



# =====================
Expand All @@ -22,6 +27,12 @@ targets = {
'url': 'http://username:password@localhost:5555/frigate-testdrive',
'file': '/tmp/mqttwarn-frigate-{camera}-{label}.png',
'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}',
# Wait for the file to arrive for three quarters of a second, and delete it after reading.
'__settings__': {
'file_retry_tries': 10,
'file_retry_interval': 0.075,
'file_unlink': True,
}
}
}

Expand Down
43 changes: 33 additions & 10 deletions examples/frigate/test_frigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Full system test verifying mqttwarn end-to-end, using an MQTT broker, and an ntfy instance.
"""
import os
import subprocess
import sys
from pathlib import Path

Expand All @@ -29,16 +30,28 @@
ASSETS = HERE.joinpath("assets")


def test_frigate_event_new(mosquitto, ntfy_service, caplog, capmqtt):
@pytest.fixture(scope="function", autouse=True)
def reset_filestore():
"""
Make sure there are no temporary spool files around before starting each test case.
"""
for filepath in Path("/tmp").glob("mqttwarn-frigate-*"):
os.unlink(filepath)


def test_frigate_with_attachment(mosquitto, ntfy_service, caplog, capmqtt):
"""
A full system test verifying the "Forwarding Frigate events to ntfy" example.
This test case submits the `frigate-event-new-good.json` file as MQTT event message.
This test case submits the `frigate-event-full.json` file as MQTT event message.
"""

# Bootstrap the core machinery without MQTT.
config = load_configuration(configfile=HERE / "frigate.ini")
bootstrap(config=config)

# Get image payload.
payload_image = get_goat_image()

# Add MQTT.
mqttc = connect()

Expand All @@ -47,12 +60,11 @@ def test_frigate_event_new(mosquitto, ntfy_service, caplog, capmqtt):

# Publish the JSON event message.
# cat frigate-event-new-good.json | jq -c | mosquitto_pub -t 'frigate/events' -l
payload_event = open(ASSETS / "frigate-event-new-good.json").read()
payload_event = open(ASSETS / "frigate-event-full.json").read()
capmqtt.publish(topic="frigate/events", payload=payload_event)

# Publish the snapshot image.
# mosquitto_pub -f goat.png -t 'frigate/cam-testdrive/goat/snapshot'
payload_image = get_goat_image()
capmqtt.publish(topic="frigate/cam-testdrive/goat/snapshot", payload=payload_image)

# Make mqttwarn receive and process the message.
Expand Down Expand Up @@ -102,14 +114,15 @@ def test_frigate_event_new(mosquitto, ntfy_service, caplog, capmqtt):
@pytest.mark.parametrize(
"jsonfile", ["frigate-event-full.json", "frigate-event-new-good.json", "frigate-event-update-good.json"]
)
def test_frigate_event_with_notification(mosquitto, ntfy_service, caplog, capmqtt, jsonfile):
def test_frigate_with_notification(mosquitto, ntfy_service, caplog, capmqtt, jsonfile):
"""
A full system test verifying the "Forwarding Frigate events to ntfy" example.
This test case submits JSON files which raise a notification.
This test case submits JSON files which trigger a notification.
"""

# Bootstrap the core machinery without MQTT.
config = load_configuration(configfile=HERE / "frigate.ini")
turn_off_retries(config)
bootstrap(config=config)

# Add MQTT.
Expand All @@ -135,8 +148,7 @@ def test_frigate_event_with_notification(mosquitto, ntfy_service, caplog, capmqt
"Headers: {"
"'Click': 'https://httpbin.org/anything?camera=cam-testdrive&label=goat&zone=lawn', "
"'Title': '=?utf-8?q?goat_entered_lawn_at_2023-04-06_14=3A31=3A46=2E638857+00=3A00?=', "
"'Message': '=?utf-8?q?goat_was_in_barn_before?=', "
"'Filename': 'mqttwarn-frigate-cam-testdrive-goat.png'}" in caplog.messages
"'Message': '=?utf-8?q?goat_was_in_barn_before?='}" in caplog.messages
)

# assert "Sent ntfy notification to 'http://localhost:5555'." in caplog.messages
Expand All @@ -156,10 +168,10 @@ def test_frigate_event_with_notification(mosquitto, ntfy_service, caplog, capmqt
"frigate-event-new-ignored.json",
],
)
def test_frigate_event_without_notification(mosquitto, caplog, capmqtt, jsonfile):
def test_frigate_without_notification(mosquitto, caplog, capmqtt, jsonfile):
"""
A full system test verifying the "Forwarding Frigate events to ntfy" example.
This test case submits JSON files which should not raise a notification.
This test case submits JSON files which should not trigger a notification.
"""

# Bootstrap the core machinery without MQTT.
Expand Down Expand Up @@ -197,3 +209,14 @@ def get_goat_image() -> bytes:
return requests.get(
"https://user-images.githubusercontent.com/453543/231550862-5a64ac7c-bdfa-4509-86b8-b1a770899647.png"
).content


def turn_off_retries(config):
"""
For test cases which do not aim to submit any attachments, adjust the configuration
to not wait for it to appear. Otherwise, it would take too long, and the verification
would fail.
"""
targets = config.getdict("config:ntfy", "targets")
targets["test"]["__settings__"]["file_retry_tries"] = None
config.set("config:ntfy", "targets", targets)
47 changes: 25 additions & 22 deletions mqttwarn/services/ntfy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from funcy import project, merge

from mqttwarn.model import Service, ProcessorItem
from mqttwarn.util import Formatter
from mqttwarn.util import Formatter, asbool, load_file

DataDict = t.Dict[str, t.Union[str, bytes]]

Expand Down Expand Up @@ -127,6 +127,7 @@ def decode_jobitem(item: ProcessorItem) -> NtfyRequest:
title = item.title
body = item.message
options: t.Dict[str, str]
settings: t.Dict[str, t.Union[str, int, float, bool]] = {}

if isinstance(item.addrs, str):
item.addrs = {"url": item.addrs}
Expand All @@ -135,11 +136,18 @@ def decode_jobitem(item: ProcessorItem) -> NtfyRequest:
else:
raise TypeError(f"Unable to handle `targets` address descriptor data type `{type(item.addrs).__name__}`: {item.addrs}")

# Decode options from target address descriptor.
options = item.addrs

url = options["url"]
attachment_path = options.get("file")

# Extract settings, purging them from the target address descriptor afterwards.
if "__settings__" in options:
if isinstance(options["__settings__"], t.Dict):
settings = dict(options["__settings__"])
del options["__settings__"]

# Collect ntfy fields.
fields: DataDict = OrderedDict()

Expand All @@ -153,10 +161,22 @@ def decode_jobitem(item: ProcessorItem) -> NtfyRequest:
# Attach a file, or not.
attachment_data = None
if attachment_path:
attachment_path, attachment_data = load_attachment(attachment_path, item.data)
if attachment_data:
# TODO: Optionally derive attachment file name from title, using `slugify(title)`.
fields.setdefault("filename", Path(attachment_path).name)
try:
attachment_path = attachment_path.format(**item.data or {})
try:
attachment_data = load_file(
path=attachment_path,
retry_tries=settings.get("file_retry_tries"),
retry_interval=settings.get("file_retry_interval"),
unlink=asbool(settings.get("file_unlink")),
)
if attachment_data:
# TODO: Optionally derive attachment file name from title, using `slugify(title)`.
fields.setdefault("filename", Path(attachment_path).name)
except Exception as ex:
logger.exception(f"ntfy: Attaching local file failed. Reason: {ex}")
except:
logger.exception("ntfy: Computing attachment file name failed")

ntfy_request = NtfyRequest(
url=url,
Expand Down Expand Up @@ -197,23 +217,6 @@ def obtain_ntfy_fields(item: ProcessorItem) -> DataDict:
return fields


def load_attachment(path: str, tplvars: t.Optional[DataDict]) -> t.Tuple[str, t.Optional[t.IO]]:
"""
Load attachment file from filesystem gracefully.
"""
data = None
try:
path = path.format(**tplvars or {})
except:
logger.exception(f"ntfy: Computing attachment file name failed")
if path:
try:
data = open(path, "rb")
except:
logger.exception(f"ntfy: Accessing attachment file failed: {path}")
return path, data


def ascii_clean(data: t.Union[str, bytes]) -> str:
"""
Return ASCII-clean variant of input string.
Expand Down
23 changes: 23 additions & 0 deletions mqttwarn/util.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# -*- coding: utf-8 -*-
# (c) 2014-2022 The mqttwarn developers
import functools
import hashlib
import imp
import json
import logging
import os
import re
import string
import typing as t

import funcy
import pkg_resources
from six import string_types

logger = logging.getLogger(__name__)


class Formatter(string.Formatter):
"""
Expand Down Expand Up @@ -224,3 +229,21 @@ def truncate(s: t.Union[str, bytes], limit: int = 200, ellipsis="...") -> str:
if len(s) > limit:
return s[:limit].strip() + ellipsis
return s


def load_file(path: str, retry_tries=None, retry_interval=0.075, unlink=False) -> t.IO[bytes]:
"""
Load file content from filesystem gracefully, with optional retrying.
"""
call = functools.partial(open, path, "rb")
if retry_tries:
logger.info(f"Retry loading file {path} for {retry_tries} times")
payload = funcy.retry(tries=int(retry_tries), timeout=float(retry_interval))(call)()
else:
payload = call()
if unlink:
try:
os.unlink(path)
except:
pass
return payload
41 changes: 25 additions & 16 deletions tests/services/test_ntfy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
dict_ascii_clean,
dict_with_titles,
encode_rfc2047,
load_attachment,
obtain_ntfy_fields,
)
from mqttwarn.util import load_module_by_name
Expand Down Expand Up @@ -76,7 +75,7 @@ def test_ntfy_decode_jobitem_attachment_success(attachment_dummy):
assert ntfy_request.attachment_data.read() == b"foo"


def test_ntfy_decode_jobitem_attachment_failure(caplog):
def test_ntfy_decode_jobitem_attachment_not_found_failure(caplog):
"""
Test the `decode_jobitem` function with an invalid attachment.
"""
Expand All @@ -93,7 +92,30 @@ def test_ntfy_decode_jobitem_attachment_failure(caplog):
assert "filename" not in ntfy_request.fields
assert ntfy_request.attachment_data is None

assert "ntfy: Accessing attachment file failed: /tmp/mqttwarn-random-unknown" in caplog.messages
assert (
"ntfy: Attaching local file failed. Reason: [Errno 2] No such file or directory: '/tmp/mqttwarn-random-unknown'"
in caplog.messages
)


def test_ntfy_decode_jobitem_attachment_interpolate_name_failure(caplog):
"""
Check how the `decode_jobitem` function fails when the template variables are invalid, or interpolation fails.
"""

item = Item(
addrs={"url": "http://localhost:9999/testdrive", "file": "/tmp/mqttwarn-random-{foobar}"},
)

ntfy_request = decode_jobitem(item)

assert ntfy_request.url == "http://localhost:9999/testdrive"
assert ntfy_request.options["url"] == "http://localhost:9999/testdrive"
assert ntfy_request.options["file"] == "/tmp/mqttwarn-random-{foobar}"
assert "filename" not in ntfy_request.fields
assert ntfy_request.attachment_data is None

assert "ntfy: Computing attachment file name failed" in caplog.messages


def test_ntfy_decode_jobitem_attachment_with_filename_success(attachment_dummy):
Expand Down Expand Up @@ -193,19 +215,6 @@ def test_ntfy_obtain_ntfy_fields_precedence():
assert outdata["message"] == "msg-config"


def test_ntfy_load_attachment_tplvar_failure(caplog):
"""
Check how the `load_attachment` helper function fails when the template variables are invalid.
"""
path, data = load_attachment(None, None)

assert path is None
assert data is None

assert "ntfy: Computing attachment file name failed" in caplog.messages
assert "AttributeError: 'NoneType' object has no attribute 'format'" in caplog.text


def test_ntfy_dict_with_titles():
"""
Test the `dict_with_titles` helper function.
Expand Down

0 comments on commit 2433cd4

Please sign in to comment.