From 71ae3f45fb6b1f1d480759462a2b9de2717e4e7c Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Wed, 23 Dec 2020 10:31:37 +0100 Subject: [PATCH 01/10] initial setup --- config.local.yaml | 11 ++++++ config.yaml | 5 +++ src/ambianic/notification.py | 76 ++++++++++++++++++++++++++++++++++++ src/ambianic/server.py | 2 + 4 files changed, 94 insertions(+) create mode 100644 src/ambianic/notification.py diff --git a/config.local.yaml b/config.local.yaml index 77f7945b..7cd07da2 100644 --- a/config.local.yaml +++ b/config.local.yaml @@ -10,6 +10,17 @@ default: timeline: event_log: ./data/timeline-event-log.yaml + notifications: + providers: {} + template: + title: "[Ambianic.ai] {event} recognized" + body: > + {event} event has been recognized by the system. + Visit https://ui.ambianic.ai/ for more details. + labels: + fall_detection: Fall + object_detection: Object + sources: {} ai_models: diff --git a/config.yaml b/config.yaml index 626df2cf..770bb0ba 100644 --- a/config.yaml +++ b/config.yaml @@ -11,6 +11,11 @@ logging: file: ./data/ambianic-log.txt level: INFO +# Store notifications configuration +notifications: + email: mailto://userid:pass@domain.com + + # Pipeline event timeline configuration timeline: event_log: ./data/timeline-event-log.yaml diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py new file mode 100644 index 00000000..eb70ede2 --- /dev/null +++ b/src/ambianic/notification.py @@ -0,0 +1,76 @@ +from ambianic.util import ServiceExit, ThreadedJob, ManagedService +import time +import apprise +import logging + +log = logging.getLogger(__name__) + + +class NotificationJob(ManagedService): + def __init__(self, config: dict): + # Create an Apprise instance + self.apobj = apprise.Apprise() + # Create an Config instance + self.config = config.get("notifications", {}) + for name, cfg in config.get("providers", {}).items(): + self.apobj.add(cfg, tag=name) + + def notify( + self, + event: str, + providers: list = ["all"], + title: str = None, + message: str = None, + attach: list = [] + ): + + labels = self.config.get("labels", {}) + templates = self.config.get("templates", {}) + + if title is None: + title = templates.get("title", "[Ambianic.ai] New {event} event" ) + + if message is None: + message = templates.get("message", "New {event} recognized") + + template_args = { + "event_type": event, + "event": labels.get(event, event), + } + title = title.safe_substitute(**template_args) + message = message.safe_substitute(**template_args) + + for provider in providers: + self.apobj.notify(message, title=title, tag=provider) + log.debug("Sent notification %s to %s" % (event, provider)) + +class NotificationServer(ManagedService): + def __init__(self, config): + self.config = config + self.notification_job = None + + def start(self, **kwargs): + log.info('Notification server job starting...') + f = NotificationJob(self.config) + self.notification_job = ThreadedJob(f) + self.notification_job.start() + log.info('NotificationFlask server job started') + + def healthcheck(self): + # Note: Implement actual health check for Flask + # See if the /healthcheck URL returns a 200 quickly + return time.monotonic(), True + + def heal(self): + """Heal the server. + + TODO: Keep an eye for potential scenarios that cause this server to + become unresponsive. + """ + + def stop(self): + if self.notification_job: + log.info('Notification server job stopping...') + self.notification_job.stop() + self.notification_job.join() + log.info('Flask server job stopped.') diff --git a/src/ambianic/server.py b/src/ambianic/server.py index a4d22286..c027258a 100644 --- a/src/ambianic/server.py +++ b/src/ambianic/server.py @@ -6,6 +6,7 @@ import time from watchdog.observers import Observer +from ambianic.notification import NotificationServer from ambianic.pipeline import timeline from ambianic.pipeline.interpreter import PipelineServer from ambianic.util import ServiceExit @@ -19,6 +20,7 @@ MAIN_HEARTBEAT_LOG_INTERVAL = 5 ROOT_SERVERS = { 'pipelines': PipelineServer, + 'notifications': NotificationServer, 'web': FlaskServer, } From 92e7ef1bcd1198c26de30ae1ff79c47d9c47c5e2 Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Tue, 29 Dec 2020 07:38:09 +0100 Subject: [PATCH 02/10] refactor notification as store function --- build/requirements.txt | 1 + config.local.yaml | 21 +++++--- config.yaml | 15 ++++-- src/ambianic/__init__.py | 6 ++- src/ambianic/notification.py | 89 +++++++++++++++++----------------- src/ambianic/pipeline/store.py | 26 ++++++++++ src/ambianic/server.py | 4 +- src/run-dev.sh | 1 + tests/test-config-secrets.yaml | 4 +- 9 files changed, 105 insertions(+), 62 deletions(-) diff --git a/build/requirements.txt b/build/requirements.txt index c28cc5d1..d4d784d2 100644 --- a/build/requirements.txt +++ b/build/requirements.txt @@ -36,3 +36,4 @@ Werkzeug>=0.15.3 concurrent-log-handler>=0.9 watchdog>=0.10 dynaconf>=3.1 +apprise>=0.8.9 \ No newline at end of file diff --git a/config.local.yaml b/config.local.yaml index 7cd07da2..660199ff 100644 --- a/config.local.yaml +++ b/config.local.yaml @@ -13,9 +13,9 @@ default: notifications: providers: {} template: - title: "[Ambianic.ai] {event} recognized" + title: "[Ambianic.ai] ${event} recognized" body: > - {event} event has been recognized by the system. + ${event} event has been recognized by the system. Visit https://ui.ambianic.ai/ for more details. labels: fall_detection: Fall @@ -26,14 +26,19 @@ default: ai_models: image_detection: model: - tflite: ai_models/mobilenet_ssd_v2_coco_quant_postprocess.tflite - edgetpu: ai_models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite - labels: ai_models/coco_labels.txt + tflite: /opt/ambianic-edge/ai_models/mobilenet_ssd_v2_coco_quant_postprocess.tflite + edgetpu: /opt/ambianic-edge/ai_models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite + labels: /opt/ambianic-edge/ai_models/coco_labels.txt face_detection: model: - tflite: ai_models/mobilenet_ssd_v2_face_quant_postprocess.tflite - edgetpu: ai_models/mobilenet_ssd_v2_face_quant_postprocess_edgetpu.tflite - labels: ai_models/coco_labels.txt + tflite: /opt/ambianic-edge/ai_models/mobilenet_ssd_v2_face_quant_postprocess.tflite + edgetpu: /opt/ambianic-edge/ai_models/mobilenet_ssd_v2_face_quant_postprocess_edgetpu.tflite + labels: /opt/ambianic-edge/ai_models/coco_labels.txt top_k: 2 + fall_detection: + model: + tflite: /opt/ambianic-edge/ai_models/posenet_mobilenet_v1_100_257x257_multi_kpt_stripped.tflite + edgetpu: /opt/ambianic-edge/ai_models/posenet_mobilenet_v1_075_721_1281_quant_decoder_edgetpu.tflite + labels: /opt/ambianic-edge/ai_models/pose_labels.txt pipelines: {} diff --git a/config.yaml b/config.yaml index 770bb0ba..55cfd513 100644 --- a/config.yaml +++ b/config.yaml @@ -11,10 +11,14 @@ logging: file: ./data/ambianic-log.txt level: INFO -# Store notifications configuration -notifications: - email: mailto://userid:pass@domain.com - +# Store notifications provider configuration +# see https://github.com/caronc/apprise#popular-notification-services for syntax examples +# notifications: +# providers: +# email: mailto://userid:pass@domain.com +# alert_fall: +# - mailto://userid:pass@domain.com +# - json://hostname/a/path/to/post/to # Pipeline event timeline configuration timeline: @@ -82,5 +86,8 @@ pipelines: - save_detections: # save samples from the inference results positive_interval: 10 idle_interval: 600000 + notify: # notify a thirdy party service + providers: + - alert_fall diff --git a/src/ambianic/__init__.py b/src/ambianic/__init__.py index 845986c7..3b58720f 100644 --- a/src/ambianic/__init__.py +++ b/src/ambianic/__init__.py @@ -27,7 +27,11 @@ def __merge_secrets(config:Union[Dynaconf, DynaBox], src_config:Dynaconf = None) if isinstance(val, dict): __merge_secrets(val, src_config) continue - if isinstance(val, str) and val[0:2] == "${": + # NOTE value must be an exact match to avoid interfering with other templates + if ( + isinstance(val, str) and + (val[0:2] == "${" and val[-1] == "}") + ): ref_key = val[2:-1] ref_val = src_config.get(ref_key, None) if ref_val is not None: diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index eb70ede2..7689418f 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -1,27 +1,48 @@ -from ambianic.util import ServiceExit, ThreadedJob, ManagedService -import time -import apprise +"""Utilities to send notifications.""" + import logging +import apprise +import os +import ambianic log = logging.getLogger(__name__) +class Notification: + def __init__(self, event:str="notification", data:dict={}, providers:list=["all"]): + self.event: str = event + self.providers: list = providers + self.title: str = None + self.message: str = None + self.attach: list = [] + self.data: dict = data -class NotificationJob(ManagedService): - def __init__(self, config: dict): - # Create an Apprise instance + def add_attachments(self, *args): + self.attach.append(*args) + + def to_dict(self) -> dict: + return dict(vars(self)) + +class NotificationHandler: + def __init__(self, config: dict = None): + if config is None: + config = ambianic.config self.apobj = apprise.Apprise() - # Create an Config instance self.config = config.get("notifications", {}) for name, cfg in config.get("providers", {}).items(): - self.apobj.add(cfg, tag=name) + if not isinstance(cfg, list): + cfg = [cfg] + for provider in cfg: + if not self.apobj.add(provider, tag=name): + log.warning("Failed to add notification provider: %s=%s" % (name, provider)) - def notify( + def send( self, event: str, providers: list = ["all"], title: str = None, message: str = None, - attach: list = [] + attach: list = [], + data: dict = {}, ): labels = self.config.get("labels", {}) @@ -33,44 +54,22 @@ def notify( if message is None: message = templates.get("message", "New {event} recognized") + attachments = [] + for a in attach: + if not os.path.exists(a) or not os.path.isfile(a): + log.warning("Attachment is not a valid file %s") + continue + attachments.append(a) + template_args = { "event_type": event, "event": labels.get(event, event), - } - title = title.safe_substitute(**template_args) - message = message.safe_substitute(**template_args) + } + data + for key, value in template_args.items(): + title = title.replace("${%s}" % key, value) + message = message.replace("${%s}" % key, value) for provider in providers: - self.apobj.notify(message, title=title, tag=provider) + self.apobj.notify(message, title=title, + tag=provider, attach=attachments) log.debug("Sent notification %s to %s" % (event, provider)) - -class NotificationServer(ManagedService): - def __init__(self, config): - self.config = config - self.notification_job = None - - def start(self, **kwargs): - log.info('Notification server job starting...') - f = NotificationJob(self.config) - self.notification_job = ThreadedJob(f) - self.notification_job.start() - log.info('NotificationFlask server job started') - - def healthcheck(self): - # Note: Implement actual health check for Flask - # See if the /healthcheck URL returns a 200 quickly - return time.monotonic(), True - - def heal(self): - """Heal the server. - - TODO: Keep an eye for potential scenarios that cause this server to - become unresponsive. - """ - - def stop(self): - if self.notification_job: - log.info('Notification server job stopping...') - self.notification_job.stop() - self.notification_job.join() - log.info('Flask server job stopped.') diff --git a/src/ambianic/pipeline/store.py b/src/ambianic/pipeline/store.py index a8018725..732950f5 100755 --- a/src/ambianic/pipeline/store.py +++ b/src/ambianic/pipeline/store.py @@ -8,6 +8,7 @@ from ambianic import DEFAULT_DATA_DIR from ambianic.pipeline import PipeElement +from ambianic.notification import Notification, NotificationHandler log = logging.getLogger(__name__) @@ -18,6 +19,7 @@ class SaveDetectionSamples(PipeElement): def __init__(self, positive_interval=2, idle_interval=600, + notify=None, **kwargs): """Create SaveDetectionSamples element with the provided arguments. @@ -33,6 +35,7 @@ def __init__(self, """ super().__init__(**kwargs) + log.info('Loading pipe element %r ', self.__class__.__name__) if self.context: self._sys_data_dir = self.context.data_dir @@ -65,6 +68,12 @@ def __init__(self, ii = idle_interval self._idle_interval = datetime.timedelta(seconds=ii) self._time_latest_saved_idle = self._time_latest_saved_detection + + # setup notification handler + self.notification = None + self.notification_providers = notify + if self.notification_providers is not None: + self.notification = NotificationHandler() def _save_sample(self, inf_time=None, @@ -119,6 +128,9 @@ def _save_sample(self, # e = PipelineEvent('Detected Objects', type='ObjectDetection') self.event_log.info('Detection Event', save_json) log.debug("Saved sample (detection event): %r ", save_json) + + self.notify(save_json) + return image_path, json_path def process_sample(self, **sample) -> Iterable[dict]: @@ -172,3 +184,17 @@ def process_sample(self, **sample) -> Iterable[dict]: } log.debug('Passing sample on: %r ', processed_sample) yield processed_sample + + def notify(self, save_json: dict): + if self.notification is None: + return + log.debug("Sending notification(s)..") + # TODO extract inference data + for inference_result in save_json['inference_result']: + data = { + 'id': save_json['id'], + 'label': inference_result['label'], + 'confidence': inference_result['confidence'], + } + notification = Notification(data=data, providers=self.notification_providers) + self.notification.send(notification.to_dict()) \ No newline at end of file diff --git a/src/ambianic/server.py b/src/ambianic/server.py index c027258a..6d5f5805 100644 --- a/src/ambianic/server.py +++ b/src/ambianic/server.py @@ -6,7 +6,6 @@ import time from watchdog.observers import Observer -from ambianic.notification import NotificationServer from ambianic.pipeline import timeline from ambianic.pipeline.interpreter import PipelineServer from ambianic.util import ServiceExit @@ -20,7 +19,6 @@ MAIN_HEARTBEAT_LOG_INTERVAL = 5 ROOT_SERVERS = { 'pipelines': PipelineServer, - 'notifications': NotificationServer, 'web': FlaskServer, } @@ -88,7 +86,7 @@ def _stop_servers(self, servers): def _healthcheck(self, servers): """Check the health of managed servers.""" for s in servers.values(): - latest_heartbeat, status = s.healthcheck() + latest_heartbeat, _ = s.healthcheck() now = time.monotonic() lapse = now - latest_heartbeat if lapse > 1: diff --git a/src/run-dev.sh b/src/run-dev.sh index 1a87cc05..e2689872 100755 --- a/src/run-dev.sh +++ b/src/run-dev.sh @@ -1,4 +1,5 @@ export LD_LIBRARY_PATH=/opt/vc/lib cd /workspace +pip3 install -r build/requirements.txt pip3 install -e src python3 -m ambianic diff --git a/tests/test-config-secrets.yaml b/tests/test-config-secrets.yaml index c10f8a8d..beab9f72 100644 --- a/tests/test-config-secrets.yaml +++ b/tests/test-config-secrets.yaml @@ -2,4 +2,6 @@ question: ${answer} deeeper: question: on: - life: ${answer} \ No newline at end of file + life: ${answer} + whatever: not the expected ${answer} + neither: ${answer} in this case \ No newline at end of file From eac934050b23909049c8ab49ea795da9234ede0f Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Wed, 30 Dec 2020 12:32:24 +0100 Subject: [PATCH 03/10] test passing --- config.yaml | 6 +- src/ambianic/notification.py | 59 ++++++++++-------- src/ambianic/pipeline/store.py | 11 ++-- src/run-dev.sh | 1 - tests/pipeline/test_notify.py | 105 +++++++++++++++++++++++++++++++++ 5 files changed, 147 insertions(+), 35 deletions(-) create mode 100644 tests/pipeline/test_notify.py diff --git a/config.yaml b/config.yaml index 55cfd513..0330e6d6 100644 --- a/config.yaml +++ b/config.yaml @@ -86,8 +86,8 @@ pipelines: - save_detections: # save samples from the inference results positive_interval: 10 idle_interval: 600000 - notify: # notify a thirdy party service - providers: - - alert_fall + # notify: # notify a thirdy party service + # providers: + # - alert_fall diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index 7689418f..81bf2664 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -8,7 +8,7 @@ log = logging.getLogger(__name__) class Notification: - def __init__(self, event:str="notification", data:dict={}, providers:list=["all"]): + def __init__(self, event:str="detection", data:dict={}, providers:list=["all"]): self.event: str = event self.providers: list = providers self.title: str = None @@ -26,50 +26,59 @@ class NotificationHandler: def __init__(self, config: dict = None): if config is None: config = ambianic.config - self.apobj = apprise.Apprise() + self.apobj = apprise.Apprise(debug=True) self.config = config.get("notifications", {}) - for name, cfg in config.get("providers", {}).items(): + providers = self.config.get("providers", {}) + for name, cfg in providers.items(): if not isinstance(cfg, list): cfg = [cfg] for provider in cfg: if not self.apobj.add(provider, tag=name): log.warning("Failed to add notification provider: %s=%s" % (name, provider)) - def send( - self, - event: str, - providers: list = ["all"], - title: str = None, - message: str = None, - attach: list = [], - data: dict = {}, - ): + def send(self, notification: Notification): - labels = self.config.get("labels", {}) + labels = dict(self.config.get("labels", {})) templates = self.config.get("templates", {}) + title = notification.title if title is None: - title = templates.get("title", "[Ambianic.ai] New {event} event" ) + title = templates.get("title", "[Ambianic.ai] New ${event} event" ) + message = notification.message if message is None: - message = templates.get("message", "New {event} recognized") + message = templates.get( + "message", "New ${event} recognized" + ) attachments = [] - for a in attach: + for a in notification.attach: if not os.path.exists(a) or not os.path.isfile(a): log.warning("Attachment is not a valid file %s") continue attachments.append(a) template_args = { - "event_type": event, - "event": labels.get(event, event), - } + data + "event_type": notification.event, + "event": labels.get(notification.event, notification.event), + } + template_args = {**template_args, **notification.data} + for key, value in template_args.items(): - title = title.replace("${%s}" % key, value) - message = message.replace("${%s}" % key, value) + k = "${%s}" % (str(key)) + v = str(value) + title = title.replace(k, v) + message = message.replace(k, v) - for provider in providers: - self.apobj.notify(message, title=title, - tag=provider, attach=attachments) - log.debug("Sent notification %s to %s" % (event, provider)) + for provider in notification.providers: + ok = self.apobj.notify( + message, + title=title, + tag=provider, + attach=attachments, + ) + if ok: + log.debug("Sent notification for %s to %s" % (notification.event, provider)) + else: + log.warning("Error sending notification for %s to %s" % + (notification.event, provider)) diff --git a/src/ambianic/pipeline/store.py b/src/ambianic/pipeline/store.py index 732950f5..a5cd43c1 100755 --- a/src/ambianic/pipeline/store.py +++ b/src/ambianic/pipeline/store.py @@ -71,8 +71,8 @@ def __init__(self, # setup notification handler self.notification = None - self.notification_providers = notify - if self.notification_providers is not None: + self.notification_config = notify + if self.notification_config is not None and self.notification_config.get("providers"): self.notification = NotificationHandler() def _save_sample(self, @@ -128,9 +128,7 @@ def _save_sample(self, # e = PipelineEvent('Detected Objects', type='ObjectDetection') self.event_log.info('Detection Event', save_json) log.debug("Saved sample (detection event): %r ", save_json) - self.notify(save_json) - return image_path, json_path def process_sample(self, **sample) -> Iterable[dict]: @@ -195,6 +193,7 @@ def notify(self, save_json: dict): 'id': save_json['id'], 'label': inference_result['label'], 'confidence': inference_result['confidence'], + 'datetime': save_json['datetime'], } - notification = Notification(data=data, providers=self.notification_providers) - self.notification.send(notification.to_dict()) \ No newline at end of file + notification = Notification(data=data, providers=self.notification_config["providers"]) + self.notification.send(notification) diff --git a/src/run-dev.sh b/src/run-dev.sh index e2689872..1a87cc05 100755 --- a/src/run-dev.sh +++ b/src/run-dev.sh @@ -1,5 +1,4 @@ export LD_LIBRARY_PATH=/opt/vc/lib cd /workspace -pip3 install -r build/requirements.txt pip3 install -e src python3 -m ambianic diff --git a/tests/pipeline/test_notify.py b/tests/pipeline/test_notify.py new file mode 100644 index 00000000..c4385569 --- /dev/null +++ b/tests/pipeline/test_notify.py @@ -0,0 +1,105 @@ +"""Test cases for SaveDetectionSamples.""" + +from PIL import Image +import os +import json +import logging +from http.server import BaseHTTPRequestHandler, HTTPServer +import socket +from threading import Thread, Event + +from ambianic import DEFAULT_DATA_DIR, config +from ambianic.pipeline.timeline import PipelineContext +from ambianic.pipeline.store import SaveDetectionSamples + +class MockRequestHandler(BaseHTTPRequestHandler): + def __init__(self, event, *args): + self.event:Event = event + BaseHTTPRequestHandler.__init__(self, *args) + def do_POST(self): + self.send_response(200) + self.end_headers() + self.event.set() + return + +def get_free_port(): + s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) + s.bind(('localhost', 0)) + _, port = s.getsockname() + s.close() + return port + +class HTTPMockServer(object): + def __init__(self, ev): + def handler(*args): + MockRequestHandler(ev, *args) + self.port = get_free_port() + self.mock_server = HTTPServer(('localhost', self.port), handler) + self.thread = Thread(target=self.mock_server.serve_forever) + self.thread.start() + + def stop(self): + self.mock_server.shutdown() + self.thread.join() + + +class _TestSaveDetectionSamples(SaveDetectionSamples): + + _save_sample_called = False + _img_path = None + _json_path = None + _inf_result = None + + def _save_sample(self, + inf_time=None, + image=None, + thumbnail=None, + inference_result=None, + inference_meta=None): + self._save_sample_called = True + self._inf_result = inference_result + self._img_path, self._json_path = \ + super()._save_sample(inf_time=inf_time, + image=image, + thumbnail=thumbnail, + inference_result=inference_result, + inference_meta=inference_meta) + + +def test_notification(): + + called:Event = Event() + mock_server = HTTPMockServer(called) + + # register the mock server endpoint + config.update({ + 'notifications': { + 'providers': { + 'test': [ + 'json://localhost:%s/webhook' % (mock_server.port) + ], + } + } + }) + + """The first time a positive sample is processed, it should be saved.""" + out_dir = os.path.dirname(os.path.abspath(__file__)) + out_dir = os.path.join(out_dir,'tmp/') + out_dir = os.path.abspath(out_dir) + context = PipelineContext(unique_pipeline_name='test pipeline notify') + context.data_dir = out_dir + notify = {'providers': ['test']} + store = _TestSaveDetectionSamples( + context=context, + event_log=logging.getLogger(), + notify=notify, + ) + img = Image.new('RGB', (60, 30), color='red') + detections = [('person', 0.98, (0, 1, 2, 3))] + processed_samples = list(store.process_sample(image=img, + thumbnail=img, + inference_result=detections)) + assert len(processed_samples) == 1 + mock_server.stop() + assert called.is_set() + From 9359cbb21d2ad70e8353176cdf961eb50570c23c Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Wed, 30 Dec 2020 14:41:43 +0100 Subject: [PATCH 04/10] review pep8 issues --- src/ambianic/__init__.py | 7 ++++--- src/ambianic/notification.py | 14 +++++++++++--- tests/pipeline/test_notify.py | 1 + 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/ambianic/__init__.py b/src/ambianic/__init__.py index 3b58720f..c3d3dcf9 100644 --- a/src/ambianic/__init__.py +++ b/src/ambianic/__init__.py @@ -27,11 +27,12 @@ def __merge_secrets(config:Union[Dynaconf, DynaBox], src_config:Dynaconf = None) if isinstance(val, dict): __merge_secrets(val, src_config) continue - # NOTE value must be an exact match to avoid interfering with other templates + # NOTE value must be an exact match to avoid interfering + # with other templates if ( - isinstance(val, str) and + isinstance(val, str) and (val[0:2] == "${" and val[-1] == "}") - ): + ): ref_key = val[2:-1] ref_val = src_config.get(ref_key, None) if ref_val is not None: diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index 81bf2664..2138afaf 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -7,8 +7,9 @@ log = logging.getLogger(__name__) + class Notification: - def __init__(self, event:str="detection", data:dict={}, providers:list=["all"]): + def __init__(self, event:str = "detection", data:dict = {}, providers:list = ["all"]): self.event: str = event self.providers: list = providers self.title: str = None @@ -22,6 +23,7 @@ def add_attachments(self, *args): def to_dict(self) -> dict: return dict(vars(self)) + class NotificationHandler: def __init__(self, config: dict = None): if config is None: @@ -34,7 +36,10 @@ def __init__(self, config: dict = None): cfg = [cfg] for provider in cfg: if not self.apobj.add(provider, tag=name): - log.warning("Failed to add notification provider: %s=%s" % (name, provider)) + log.warning( + "Failed to add notification provider: %s=%s" + % (name, provider) + ) def send(self, notification: Notification): @@ -78,7 +83,10 @@ def send(self, notification: Notification): attach=attachments, ) if ok: - log.debug("Sent notification for %s to %s" % (notification.event, provider)) + log.debug( + "Sent notification for %s to %s" % + (notification.event, provider) + ) else: log.warning("Error sending notification for %s to %s" % (notification.event, provider)) diff --git a/tests/pipeline/test_notify.py b/tests/pipeline/test_notify.py index c4385569..6a52fce4 100644 --- a/tests/pipeline/test_notify.py +++ b/tests/pipeline/test_notify.py @@ -12,6 +12,7 @@ from ambianic.pipeline.timeline import PipelineContext from ambianic.pipeline.store import SaveDetectionSamples + class MockRequestHandler(BaseHTTPRequestHandler): def __init__(self, event, *args): self.event:Event = event From 3efc440fd4215f3dbfe88cb9dea22194af04e5d1 Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Wed, 30 Dec 2020 17:46:23 +0100 Subject: [PATCH 05/10] added attachments option and improved coverage --- config.yaml | 15 ++++++---- src/ambianic/notification.py | 16 +++++++---- tests/pipeline/test_notify.py | 54 +++++++++++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/config.yaml b/config.yaml index 0330e6d6..5a51a137 100644 --- a/config.yaml +++ b/config.yaml @@ -14,9 +14,12 @@ logging: # Store notifications provider configuration # see https://github.com/caronc/apprise#popular-notification-services for syntax examples # notifications: -# providers: -# email: mailto://userid:pass@domain.com -# alert_fall: +# email: +# include_attachments: true +# providers: +# - mailto://userid:pass@domain.com +# alert_fall: +# providers: # - mailto://userid:pass@domain.com # - json://hostname/a/path/to/post/to @@ -86,8 +89,8 @@ pipelines: - save_detections: # save samples from the inference results positive_interval: 10 idle_interval: 600000 - # notify: # notify a thirdy party service - # providers: - # - alert_fall +# notify: # notify a thirdy party service +# providers: +# - alert_fall diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index 2138afaf..0483fd07 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -30,11 +30,9 @@ def __init__(self, config: dict = None): config = ambianic.config self.apobj = apprise.Apprise(debug=True) self.config = config.get("notifications", {}) - providers = self.config.get("providers", {}) - for name, cfg in providers.items(): - if not isinstance(cfg, list): - cfg = [cfg] - for provider in cfg: + for name, cfg in self.config.items(): + providers = cfg.get("providers", []) + for provider in providers: if not self.apobj.add(provider, tag=name): log.warning( "Failed to add notification provider: %s=%s" @@ -76,11 +74,17 @@ def send(self, notification: Notification): message = message.replace(k, v) for provider in notification.providers: + cfg = self.config.get(provider, None) + if cfg is None: + log.warning("Skip unknown provider %s" % provider) + continue + + include_attachments = cfg.get("include_attachments", False) ok = self.apobj.notify( message, title=title, tag=provider, - attach=attachments, + attach=attachments if include_attachments else [], ) if ok: log.debug( diff --git a/tests/pipeline/test_notify.py b/tests/pipeline/test_notify.py index 6a52fce4..6e06137b 100644 --- a/tests/pipeline/test_notify.py +++ b/tests/pipeline/test_notify.py @@ -18,10 +18,17 @@ def __init__(self, event, *args): self.event:Event = event BaseHTTPRequestHandler.__init__(self, *args) def do_POST(self): + + # read the message and convert it into a python dictionary + length = int(self.headers.get('content-length')) + message = json.loads(self.rfile.read(length)) + + assert message.get("title") is not None + self.send_response(200) self.end_headers() + self.event.set() - return def get_free_port(): s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) @@ -66,24 +73,60 @@ def _save_sample(self, inference_result=inference_result, inference_meta=inference_meta) +def test_notification_with_attachments(): + """Ensure a positive detection is notified""" + called: Event = Event() + mock_server = HTTPMockServer(called) -def test_notification(): + # register the mock server endpoint + config.update({ + 'notifications': { + 'test': { + 'include_attachments': True, + 'providers': [ + 'json://localhost:%s/webhook' % (mock_server.port) + ], + } + } + }) + out_dir = os.path.dirname(os.path.abspath(__file__)) + out_dir = os.path.join(out_dir, 'tmp/') + out_dir = os.path.abspath(out_dir) + context = PipelineContext(unique_pipeline_name='test pipeline notify') + context.data_dir = out_dir + notify = {'providers': ['test']} + store = _TestSaveDetectionSamples( + context=context, + event_log=logging.getLogger(), + notify=notify, + ) + img = Image.new('RGB', (60, 30), color='red') + detections = [('person', 0.98, (0, 1, 2, 3))] + processed_samples = list(store.process_sample(image=img, + thumbnail=img, + inference_result=detections)) + assert len(processed_samples) == 1 + mock_server.stop() + assert called.is_set() + +def test_plain_notification(): + """Ensure a positive detection is notified""" called:Event = Event() mock_server = HTTPMockServer(called) # register the mock server endpoint config.update({ 'notifications': { - 'providers': { - 'test': [ + 'test': { + 'include_attachments': True, + 'providers': [ 'json://localhost:%s/webhook' % (mock_server.port) ], } } }) - """The first time a positive sample is processed, it should be saved.""" out_dir = os.path.dirname(os.path.abspath(__file__)) out_dir = os.path.join(out_dir,'tmp/') out_dir = os.path.abspath(out_dir) @@ -103,4 +146,3 @@ def test_notification(): assert len(processed_samples) == 1 mock_server.stop() assert called.is_set() - From 149ce49ec0016b60be9aa8e024bb751cac6846e4 Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Wed, 30 Dec 2020 17:49:14 +0100 Subject: [PATCH 06/10] fix codefactor issues --- config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yaml b/config.yaml index 5a51a137..a2eb09c2 100644 --- a/config.yaml +++ b/config.yaml @@ -14,7 +14,7 @@ logging: # Store notifications provider configuration # see https://github.com/caronc/apprise#popular-notification-services for syntax examples # notifications: -# email: +# email: # include_attachments: true # providers: # - mailto://userid:pass@domain.com @@ -90,7 +90,7 @@ pipelines: positive_interval: 10 idle_interval: 600000 # notify: # notify a thirdy party service -# providers: +# providers: # - alert_fall From ae91c3c4cf78bc4d430e53b66681bc971bdce3ac Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Thu, 31 Dec 2020 07:48:20 +0100 Subject: [PATCH 07/10] added event_details_url placeholder --- config.local.yaml | 2 +- src/ambianic/notification.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/config.local.yaml b/config.local.yaml index 660199ff..b3842de0 100644 --- a/config.local.yaml +++ b/config.local.yaml @@ -16,7 +16,7 @@ default: title: "[Ambianic.ai] ${event} recognized" body: > ${event} event has been recognized by the system. - Visit https://ui.ambianic.ai/ for more details. + Visit ${event_details_url} for more details. labels: fall_detection: Fall object_detection: Object diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index 0483fd07..f1c736c8 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -7,6 +7,7 @@ log = logging.getLogger(__name__) +UI_BASEURL = "https://ui.ambianic.ai" class Notification: def __init__(self, event:str = "detection", data:dict = {}, providers:list = ["all"]): @@ -64,6 +65,7 @@ def send(self, notification: Notification): template_args = { "event_type": notification.event, "event": labels.get(notification.event, notification.event), + "event_details_url": "%s/%s" % (UI_BASEURL, notification.data.get("id", "")) } template_args = {**template_args, **notification.data} From fcdcd6faae04a45e92dfc8d9122486c1fc6f63a0 Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Thu, 31 Dec 2020 07:54:38 +0100 Subject: [PATCH 08/10] drop labels --- config.local.yaml | 3 --- src/ambianic/notification.py | 7 +++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config.local.yaml b/config.local.yaml index b3842de0..160ce1af 100644 --- a/config.local.yaml +++ b/config.local.yaml @@ -17,9 +17,6 @@ default: body: > ${event} event has been recognized by the system. Visit ${event_details_url} for more details. - labels: - fall_detection: Fall - object_detection: Object sources: {} diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index f1c736c8..12e863ff 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -42,7 +42,6 @@ def __init__(self, config: dict = None): def send(self, notification: Notification): - labels = dict(self.config.get("labels", {})) templates = self.config.get("templates", {}) title = notification.title @@ -62,9 +61,13 @@ def send(self, notification: Notification): continue attachments.append(a) + event_name = notification.data.get("category", "") + if not event_name: + event_name = notification.event + template_args = { "event_type": notification.event, - "event": labels.get(notification.event, notification.event), + "event": event_name, "event_details_url": "%s/%s" % (UI_BASEURL, notification.data.get("id", "")) } template_args = {**template_args, **notification.data} From 012f94909dde73809f9ca140a4719d19d2fdac7f Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Thu, 31 Dec 2020 07:56:54 +0100 Subject: [PATCH 09/10] fix label var name --- src/ambianic/notification.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/ambianic/notification.py b/src/ambianic/notification.py index 12e863ff..f9e241cc 100644 --- a/src/ambianic/notification.py +++ b/src/ambianic/notification.py @@ -61,13 +61,9 @@ def send(self, notification: Notification): continue attachments.append(a) - event_name = notification.data.get("category", "") - if not event_name: - event_name = notification.event - template_args = { "event_type": notification.event, - "event": event_name, + "event": notification.data.get("label", notification.event), "event_details_url": "%s/%s" % (UI_BASEURL, notification.data.get("id", "")) } template_args = {**template_args, **notification.data} From ef46195a168beb5aba5f12bd9d35e10e09f4052c Mon Sep 17 00:00:00 2001 From: Luca Capra Date: Thu, 31 Dec 2020 18:33:51 +0100 Subject: [PATCH 10/10] fix notification labels --- config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index a2eb09c2..8c88040d 100644 --- a/config.yaml +++ b/config.yaml @@ -14,7 +14,7 @@ logging: # Store notifications provider configuration # see https://github.com/caronc/apprise#popular-notification-services for syntax examples # notifications: -# email: +# catch_all_email: # include_attachments: true # providers: # - mailto://userid:pass@domain.com