Skip to content

Commit

Permalink
Celery alert processing automation
Browse files Browse the repository at this point in the history
* Update main_cap-aggregator(dev).yml

* added separate configurations for each cap source format

* added region data

* reset migrations

* removed region usage before declaration

* fixed variation scope issue

* created migration

* injecting regions

* injecting countries (#7)

* matched CAP alerts to country polygons

* Integrated Celery and RabbitMQ

* Preparation for region data injection (#6)

* Update main_cap-aggregator(dev).yml

* added separate configurations for each cap source format

* added region data

* reset migrations

* removed region usage before declaration

* fixed variation scope issue

* Add or update the Azure App Service build and deployment workflow config

* add celery tasks

---------

Co-authored-by: HangZhou01 <ucabhz9@ucl.ac.uk>

* added celery startup commands

* changed startup.sh

* added filter for unexpired alerts, fixed cors policy

* enabled github workflow tests for all branches, whitenoise package version bump

* fixed cors policy origin path

* disabled celery for deployment

* reverted cors policy

---------

Co-authored-by: HangZhou01 <ucabhz9@ucl.ac.uk>
  • Loading branch information
Alfiejz and Hang-Zhou-Tim committed Jun 24, 2023
1 parent 5c6fed7 commit 24163ff
Show file tree
Hide file tree
Showing 27 changed files with 601 additions and 154 deletions.
63 changes: 63 additions & 0 deletions .github/workflows/celery-test_cap-aggregator(celery-test).yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Docs for the Azure Web Apps Deploy action: https://github.com/Azure/webapps-deploy
# More GitHub Actions for Azure: https://github.com/Azure/actions
# More info on Python, GitHub Actions, and Azure App Service: https://aka.ms/python-webapps-actions

name: Build and deploy Python app to Azure Web App - cap-aggregator

on:
push:
branches:
- celery-test
workflow_dispatch:

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Set up Python version
uses: actions/setup-python@v1
with:
python-version: '3.11'

- name: Create and start virtual environment
run: |
python -m venv venv
source venv/bin/activate
- name: Install dependencies
run: pip install -r requirements.txt

# Optional: Add step to run tests here (PyTest, Django test suites, etc.)

- name: Upload artifact for deployment jobs
uses: actions/upload-artifact@v2
with:
name: python-app
path: |
.
!venv/
deploy:
runs-on: ubuntu-latest
needs: build
environment:
name: 'celery-test'
url: ${{ steps.deploy-to-webapp.outputs.webapp-url }}

steps:
- name: Download artifact from build job
uses: actions/download-artifact@v2
with:
name: python-app
path: .

- name: 'Deploy to Azure Web App'
uses: azure/webapps-deploy@v2
id: deploy-to-webapp
with:
app-name: 'cap-aggregator'
slot-name: 'celery-test'
publish-profile: ${{ secrets.AZUREAPPSERVICE_PUBLISHPROFILE_0FD4FFF13EAD42BCA6117317784F30E7 }}
6 changes: 4 additions & 2 deletions .github/workflows/python-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ name: Python check

on:
push:
branches: [ main ]
branches:
- '*'
pull_request:
branches: [ main ]
branches:
- '*'

jobs:
test_package:
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,7 @@ dmypy.json
# Pyre type checker
.pyre/

.azure
.azure

# editors
.idea/
163 changes: 163 additions & 0 deletions cap_feed/alert_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import json
import requests

import xml.etree.ElementTree as ET
import pytz
from .models import Alert, Region, Country
from datetime import datetime
from django.utils import timezone



# inject region and country data if not already present
def inject_unknown_regions():
if Region.objects.count() == 0:
inject_regions()
# inject unknown region for alerts without a defined region
unknown_region = Region()
unknown_region.id = -1
unknown_region.name = "Unknown"
unknown_region.save()
if Country.objects.count() == 0:
inject_countries()
# inject unknown country for alerts without a defined country
unknown_country = Country()
unknown_country.id = -1
unknown_country.name = "Unknown"
unknown_country.save()

# inject region data
def inject_regions():
with open('cap_feed/region.json') as file:
region_data = json.load(file)
for region_entry in region_data:
region = Region()
region.id = region_entry["id"]
region.name = region_entry["region_name"]
region.centroid = region_entry["centroid"]
coordinates = region_entry["bbox"]["coordinates"][0]
for coordinate in coordinates:
region.polygon += str(coordinate[0]) + "," + str(coordinate[1]) + " "
region.save()

# inject country data
def inject_countries():
with open('cap_feed/country.json') as file:
country_data = json.load(file)
for country_entry in country_data:
country = Country()
country.id = country_entry["id"]
country.name = country_entry["name"]
region_id = country_entry["region"]
if ("Region" in country.name) or ("Cluster" in country.name):
continue
if region_id is not None:
country.region = Region.objects.get(id=country_entry["region"])
if country_entry["iso"] is not None:
country.iso = country_entry["iso"]
if country_entry["iso3"] is not None:
country.iso3 = country_entry["iso3"]
if country_entry["bbox"] is not None:
coordinates = country_entry["bbox"]["coordinates"][0]
for coordinate in coordinates:
country.polygon += str(coordinate[0]) + "," + str(coordinate[1]) + " "
if country_entry["centroid"] is not None:
coordinates = country_entry["centroid"]["coordinates"]
country.centroid = str(coordinates[0]) + "," + str(coordinates[1])
country.save()

# converts CAP1.2 iso format datetime string to datetime object in UTC timezone
def convert_datetime(original_datetime):
return datetime.fromisoformat(original_datetime).astimezone(pytz.timezone('UTC'))

# gets alerts from sources and processes them different for each source format
def get_alerts():
# list of sources and configurations
sources = [
("https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-france", "FRA", "meteoalarm", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
("https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-belgium", "BEL", "meteoalarm", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
("https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-austria", "AUT", "meteoalarm", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
("https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-slovakia", "SVK", "meteoalarm", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
("https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-slovenia", "SVN", "meteoalarm", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
("https://alert.metservice.gov.jm/capfeed.php", "JAM", "capfeedphp", {'atom': 'http://www.w3.org/2005/Atom', 'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}),
]

for source in sources:
url, iso3, format, ns = source
match format:
case "meteoalarm":
get_alert_meteoalarm(url, iso3, ns)
case "capfeedphp":
get_alert_capfeedphp(url, iso3, ns)

# processing for meteoalarm format, example: https://feeds.meteoalarm.org/feeds/meteoalarm-legacy-atom-france
def get_alert_meteoalarm(url, iso3, ns):
response = requests.get(url)
root = ET.fromstring(response.content)
for entry in root.findall('atom:entry', ns):
try:
alert = Alert()
alert.id = entry.find('atom:id', ns).text
alert.identifier = entry.find('cap:identifier', ns).text
alert.sender = url
alert.sent = convert_datetime(entry.find('cap:sent', ns).text)
alert.status = entry.find('cap:status', ns).text
alert.msg_type = entry.find('cap:message_type', ns).text
alert.scope = entry.find('cap:scope', ns).text
alert.urgency = entry.find('cap:urgency', ns).text
alert.severity = entry.find('cap:severity', ns).text
alert.certainty = entry.find('cap:certainty', ns).text
alert.effective = convert_datetime(entry.find('cap:effective', ns).text)
alert.expires = convert_datetime(entry.find('cap:expires', ns).text)
if alert.expires < timezone.now():
continue

alert.area_desc = entry.find('cap:areaDesc', ns).text
alert.event = entry.find('cap:event', ns).text

geocode = entry.find('cap:geocode', ns)
alert.geocode_name = geocode.find('atom:valueName', ns).text
alert.geocode_value = geocode.find('atom:value', ns).text
alert.country = Country.objects.get(iso3=iso3)
alert.save()
except:
pass

# processing for capfeedphp format, example: https://alert.metservice.gov.jm/capfeed.php
def get_alert_capfeedphp(url, iso3, ns):
response = requests.get(url)
root = ET.fromstring(response.content)
for entry in root.findall('atom:entry', ns):
try:
alert = Alert()
alert.id = entry.find('atom:id', ns).text

entry_content = entry.find('atom:content', ns)
entry_content_alert = entry_content.find('cap:alert', ns)
alert.identifier = entry_content_alert.find('cap:identifier', ns).text
alert.sender = entry_content_alert.find('cap:sender', ns).text
alert.sent = convert_datetime(entry_content_alert.find('cap:sent', ns).text)
alert.status = entry_content_alert.find('cap:status', ns).text
alert.msg_type = entry_content_alert.find('cap:msgType', ns).text
alert.scope = entry_content_alert.find('cap:scope', ns).text

entry_content_alert_info = entry_content_alert.find('cap:info', ns)
alert.urgency = entry_content_alert_info.find('cap:urgency', ns).text
alert.severity = entry_content_alert_info.find('cap:severity', ns).text
alert.certainty = entry_content_alert_info.find('cap:certainty', ns).text
alert.effective = convert_datetime(entry_content_alert_info.find('cap:effective', ns).text)
alert.expires = convert_datetime(entry_content_alert_info.find('cap:expires', ns).text)
if alert.expires < timezone.now():
continue
alert.event = entry_content_alert_info.find('cap:event', ns).text

entry_content_alert_info_area = entry_content_alert_info.find('cap:area', ns)
alert.area_desc = entry_content_alert_info_area.find('cap:areaDesc', ns).text
alert.polygon = entry_content_alert_info_area.find('cap:polygon', ns).text
alert.country = Country.objects.get(iso3=iso3)
alert.save()
except:
pass

def remove_expired_alerts():
Alert.objects.filter(expires__lt=timezone.now()).delete()
Loading

0 comments on commit 24163ff

Please sign in to comment.