Skip to content

Commit

Permalink
Refactor notifications queue
Browse files Browse the repository at this point in the history
* Refactors NotificationBase to be concerned only with sending the
  message. Notifications now have no knowledge of the celery queue
  or Project/Service models
* Alert model handles expanding alert json based on Project/Service
  model but has no knowledge about about configured sender plugins

* Incoming alerts are saved to the database for auditing purposes and
  queued to be processed
* Processing alerts expands the alert data, and de-duplicates sender
  targets before queuing to be sent
  • Loading branch information
kfdm committed Jul 26, 2018
1 parent c430506 commit 088689d
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 112 deletions.
7 changes: 6 additions & 1 deletion promgen/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from django import forms
from django.contrib import admin

from promgen import models, plugins


Expand Down Expand Up @@ -96,3 +95,9 @@ def get_queryset(self, request):
class PrometheusAdmin(admin.ModelAdmin):
list_display = ('shard', 'host', 'port')
list_filter = ('shard',)


@admin.register(models.Alert)
class AlertAdmin(admin.ModelAdmin):
list_display = ('created',)
readonly_fields = ('created', 'body')
31 changes: 31 additions & 0 deletions promgen/management/commands/test-alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2018 LINE Corporation
# These sources are released under the terms of the MIT license: see LICENSE

import json
import logging

from django.core.management.base import BaseCommand
from django.test import override_settings
from promgen import models, tasks, tests


class Command(BaseCommand):
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def handle(self, **kwargs):
logging._handlers = []
logging.basicConfig(level=logging.DEBUG)

data = tests.PromgenTest.data_json('examples', 'alertmanager.json')

shard, _ = models.Shard.objects.get_or_create(name='Shard Test')
service, _ = models.Service.objects.get_or_create(
shard=shard, name=data['commonLabels']['service']
)
project, _ = models.Project.objects.get_or_create(
service=service, name=data['commonLabels']['project']
)

alert = models.Alert.objects.create(
body=json.dumps(data)
)
tasks.process_alert(alert.pk)
23 changes: 23 additions & 0 deletions promgen/migrations/0003_alert_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.0.7 on 2018-07-26 08:15

from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('promgen', '0002_auto_20180316_0525'),
]

operations = [
migrations.CreateModel(
name='Alert',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=django.utils.timezone.now)),
('body', models.TextField()),
],
),
]
56 changes: 47 additions & 9 deletions promgen/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,19 @@ def driver_set(cls):
except ImportError:
logger.warning('Error importing %s', entry.module_name)

__driver = {}
@property
def driver(self):
'''Return configured driver for Sender model instance'''
if self.sender in self.__driver:
return self.__driver[self.sender]

for entry in plugins.notifications():
if entry.module_name == self.sender:
try:
return entry.load()()
except ImportError:
logger.warning('Error importing %s', entry.module_name)
try:
self.__driver[entry.module_name] = entry.load()()
except ImportError:
logger.warning('Error importing %s', entry.module_name)
return self.__driver[self.sender]

def test(self):
'''
Expand All @@ -126,10 +130,8 @@ def test(self):
for alert in data.get('alerts', []):
alert['labels'][self.content_type.name] = self.content_object.name

for entry in plugins.notifications():
if entry.module_name == self.sender:
plugin = entry.load()()
plugin.test(self.value, data)
from promgen import tasks
tasks.send_alert(self.sender, self.value, data)


class Shard(models.Model):
Expand Down Expand Up @@ -446,6 +448,42 @@ class RuleAnnotation(models.Model):
rule = models.ForeignKey('Rule', on_delete=models.CASCADE)


class Alert(models.Model):
created = models.DateTimeField(default=timezone.now)
body = models.TextField()

def expand(self):
# Map of Prometheus labels to Promgen objects
LABEL_MAPPING = [
('project', Project),
('service', Service),
]
routable = {}
data = self.json()

data.setdefault('commonLabels', {})
data.setdefault('commonAnnotations', {})

# Look through our labels and find the object from Promgen's DB
# If we find an object in Promgen, add an annotation with a direct link
for label, klass in LABEL_MAPPING:
if label not in data['commonLabels']:
logger.debug('Missing label %s', label)
continue

# Should only find a single value, but I think filter is a little
# bit more forgiving than get in terms of throwing errors
for obj in klass.objects.filter(name=data['commonLabels'][label]):
logger.debug('Found %s %s', label, obj)
routable[label] = obj
data['commonAnnotations'][label] = resolve_domain(obj)

return routable, data

def json(self):
return json.loads(self.body)


class Audit(models.Model):
body = models.TextField()
created = models.DateTimeField()
Expand Down
85 changes: 0 additions & 85 deletions promgen/notification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from django import forms
from django.conf import settings
from django.template.loader import render_to_string
from promgen import tasks
from promgen.models import Project, Service
from promgen.shortcuts import resolve_domain

logger = logging.getLogger(__name__)

Expand All @@ -23,10 +20,6 @@ class NotificationBase(object):
'''
Base Notification class
'''
MAPPING = [
('project', Project),
('service', Service),
]

form = FormSenderBase

Expand All @@ -35,20 +28,6 @@ def help(cls):
if cls.__doc__:
return textwrap.dedent(cls.__doc__)

@classmethod
def process(cls, data):
'''
Process a notification
By default, this will just queue an item in celery to be processed but in some cases
a notifier may want to immediately process it or otherwise send a message, so we
provide this entry hook
'''
params = {'args': (cls.__module__, data)}
if hasattr(cls, 'queue'):
params['queue'] = getattr(cls, 'queue')
tasks.send_notification.apply_async(**params)

def _send(self, target, alert):
'''
Sender specific implmentation
Expand All @@ -72,70 +51,6 @@ def config(self, key):
except KeyError:
logger.error('Undefined setting. Please check for %s under %s in settings.yml', key, self.__module__)

def expand(self, data):
'''
Look through our alert and expand any objects we find
This is responsible for looking through our alerts and finding any
supported labels (like Projects and Services) that we can expand into
an annotation
'''
output = {}

data.setdefault('commonLabels', {})
data.setdefault('commonAnnotations', {})

# Look through our labels and find the object from Promgen's DB
# If we find an object in Promgen, add an annotation with a direct link
for label, klass in self.MAPPING:
if label not in data['commonLabels']:
logger.debug('Missing label %s', label)
continue

# Should only find a single value, but I think filter is a little
# bit more forgiving than get in terms of throwing errors
for obj in klass.objects.filter(name=data['commonLabels'][label]):
logger.debug('Found %s %s', label, obj)
output[label] = obj
data['commonAnnotations'][label] = resolve_domain(obj)
return output

def send(self, data):
'''
Send out an alert
This handles looping through the alerts from Alert Manager and checks
to see if there are any notification senders configured for the
combination of project/service and sender type.
See tests/examples/alertmanager.json for an example payload
'''
sent = 0
output = self.expand(data)

for label, obj in output.items():
for sender in obj.notifiers.filter(sender=self.__module__):
try:
self._send(sender.value, data)
except:
logger.exception('Error sending notification')
else:
sent += 1
if sent == 0:
logger.debug('No senders configured for project or service')
return sent

def test(self, target, data):
'''
Send out test notification
Combine a simple test alert from our view, with the remaining required
parameters for our sender child classes
'''
logger.debug('Sending test message to %s', target)
self.expand(data)
self._send(target, data)

def render(self, template, context):
s = render_to_string(template, context).strip()
# Uncomment to re-generate test templates
Expand Down
5 changes: 5 additions & 0 deletions promgen/notification/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class NotificationUser(NotificationBase):

form = FormUser

def splay(self, address):
user = User.objects.get(username=address)
for sender in models.Sender.filter(obj=user):
yield sender

def _send(self, address, data):
user = User.objects.get(username=address)
for sender in models.Sender.filter(obj=user):
Expand Down
53 changes: 41 additions & 12 deletions promgen/tasks.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,54 @@
# Copyright (c) 2017 LINE Corporation
# These sources are released under the terms of the MIT license: see LICENSE

import json
import collections
import logging

from celery import shared_task

from promgen import plugins, prometheus, signals # NOQA
from promgen import models, plugins, prometheus, signals # NOQA

logger = logging.getLogger(__name__)


@shared_task
def send_notification(sender, body):
body = json.loads(body)
logger.info('Attempting to send alert for %s', sender)
def process_alert(alert_pk):
'''
Process alert for routing and notifications
We load our Alert from the database and expand it to determine which labels are routable
Next we loop through all senders configured and de-duplicate sender:target pairs before
queing the notification to actually be sent
'''
alert = models.Alert.objects.get(pk=alert_pk)
routable, data = alert.expand()

# Now that we have our routable items, we want to check which senders are
# configured and expand those as needed
senders = collections.defaultdict(set)
for label, obj in routable.items():
logger.debug('Processing %s %s', label, obj)
for sender in models.Sender.filter(obj):
if hasattr(sender.driver, 'splay'):
for splay in sender.driver.splay(sender.value):
senders[splay.sender].add(splay.value)
else:
senders[sender.sender].add(sender.value)

for driver in senders:
for target in senders[driver]:
send_alert.delay(driver, target, data, alert.pk)


@shared_task
def send_alert(sender, target, data, alert_pk=None):
'''
Send alert to specific target
alert_pk is used for debugging purposes
'''
logger.debug('Sending %s %s', sender, target)
for plugin in plugins.notifications():
if sender == plugin.module_name:
try:
instance = plugin.load()()
count = instance.send(body)
logger.info('Sent %d alerts with %s', count, sender)
except:
logger.exception('Error sending message')
instance = plugin.load()()
instance._send(target, data)
2 changes: 1 addition & 1 deletion promgen/tests/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ def test_email(self, mock_email):
'promgen@example.com',
['foo@example.com']
)
])
], any_order=True)
# Three senders are registered but only two should trigger
self.assertTrue(mock_email.call_count == 2)
9 changes: 5 additions & 4 deletions promgen/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import promgen.templatetags.promgen as macro
from promgen import (celery, discovery, forms, models, plugins, prometheus,
signals, util, version)
signals, tasks, util, version)
from promgen.shortcuts import resolve_domain

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -899,9 +899,10 @@ def post(self, request):

class Alert(View):
def post(self, request, *args, **kwargs):
body = request.body.decode('utf-8')
for entry in plugins.notifications():
entry.load().process(body)
alert = models.Alert.objects.create(
body=request.body.decode('utf-8')
)
tasks.process_alert.delay(alert.pk)
return HttpResponse('OK', status=202)


Expand Down

0 comments on commit 088689d

Please sign in to comment.