diff --git a/front/plugins/README.md b/front/plugins/README.md
index 441786427..2d95ab58a 100755
--- a/front/plugins/README.md
+++ b/front/plugins/README.md
@@ -28,11 +28,13 @@
| | | SNMPDSC | Script | 📚[snmp_discovery](/front/plugins/snmp_discovery/) |
| | Yes* | UNDIS | Script | 📚[undiscoverables](/front/plugins/undiscoverables/) |
| | Yes | UNFIMP | Script | 📚[unifi_import](/front/plugins/unifi_import/) |
+| | | VNDRPDT | Script | 📚[vendor_update](/front/plugins/vendor_update/) |
| | | WEBMON | Script | 📚[website_monitor](/front/plugins/website_monitor/) |
| N/A | | N/A | SQL query | No example available, but the External SQLite based plugins work very similar |
> \* The Undiscoverables plugin (`UNDIS`) inserts only user-specified dummy devices.
-> \* The dabase cleanup plugin is not _required_ but the app will become unusable after a while if not executed.
+> \* The database cleanup plugin (`DBCLNP`) is not _required_ but the app will become unusable after a while if not executed.
> [!NOTE]
> You soft-disable plugins via Settings or completely ignore plugins by placing a `ignore_plugin` file into the plugin directory. The difference is that ignored plugins don't show up anywhere in the UI (Settings, Device details, Plugins pages). The app skips ignored plugins completely. Device-detecting plugins insert values into the `CurrentScan` database table. The plugins that are not required are safe to ignore, however it makes sense to have a least some device-detecting plugins (that insert entries into the `CurrentScan` table) enabled, such as ARPSCAN or PIHOLE.
@@ -568,7 +570,6 @@ You can have any `"function": "my_custom_name"` custom name, however, the ones l
| | - `missing-in-last-scan` - if the object is missing compared to previous scans |
> 🔎 Example:
> ```json
diff --git a/front/plugins/internet_ip/README.md b/front/plugins/internet_ip/README.md
new file mode 100755
index 000000000..03fa70f4c
--- /dev/null
+++ b/front/plugins/internet_ip/README.md
@@ -0,0 +1,7 @@
+## Overview
+Plugin to run regular database cleanup tasks. It is strongly recommended to have an hourly or at least daily schedule running.
+### Usage
+- Check the Settings page for details.
diff --git a/front/plugins/internet_ip/config.json b/front/plugins/internet_ip/config.json
new file mode 100755
index 000000000..8d242dd6b
--- /dev/null
+++ b/front/plugins/internet_ip/config.json
@@ -0,0 +1,180 @@
+ "code_name": "internet_ip",
+ "unique_prefix": "INTRNT",
+ "enabled": true,
+ "data_source": "script",
+ "show_ui": true,
+ "localized": ["display_name", "description", "icon"],
+ "display_name": [
+ {
+ "language_code": "en_us",
+ "string": "Internet check"
+ }
+ ],
+ "icon": [
+ {
+ "language_code": "en_us",
+ "string": ""
+ }
+ ],
+ "description": [
+ {
+ "language_code": "en_us",
+ "string": "A plugin to check your internet connectivity and IP."
+ }
+ ],
+ "params" : [{
+ "name" : "pluginskeephistory",
+ "type" : "setting",
+ "value" : "PLUGINS_KEEP_HIST"
+ },
+ {
+ "name" : "daystokeepevents",
+ "type" : "setting",
+ "value" : "DAYS_TO_KEEP_EVENTS"
+ },
+ {
+ "name" : "hourstokeepnewdevice",
+ "type" : "setting",
+ "value" : "HRS_TO_KEEP_NEWDEV"
+ },
+ {
+ "name" : "pholuskeepdays",
+ "type" : "setting",
+ "value" : "PHOLUS_DAYS_DATA"
+ }
+ ],
+ "settings": [
+ {
+ "function": "RUN",
+ "type": "text.select",
+ "default_value":"schedule",
+ "options": ["disabled", "once", "schedule", "always_after_scan"],
+ "localized": ["name", "description"],
+ "name" :[{
+ "language_code":"en_us",
+ "string" : "When to run"
+ },
+ {
+ "language_code":"es_es",
+ "string" : "Cuándo ejecutar"
+ },
+ {
+ "language_code":"de_de",
+ "string" : "Wann laufen"
+ }],
+ "description": [{
+ "language_code":"en_us",
+ "string" : "When the cleanup should be performed. An hourly or daily SCHEDULE
is a good option."
+ }]
+ },
+ {
+ "function": "CMD",
+ "type": "readonly",
+ "default_value": "python3 /home/pi/pialert/front/plugins/db_cleanup/script.py pluginskeephistory={pluginskeephistory} hourstokeepnewdevice={hourstokeepnewdevice} daystokeepevents={daystokeepevents} pholuskeepdays={pholuskeepdays}",
+ "options": [],
+ "localized": ["name", "description"],
+ "name": [
+ {
+ "language_code": "en_us",
+ "string": "Command"
+ },
+ {
+ "language_code": "es_es",
+ "string": "Comando"
+ },
+ {
+ "language_code": "de_de",
+ "string": "Befehl"
+ }
+ ],
+ "description": [
+ {
+ "language_code": "en_us",
+ "string": "Command to run. This can not be changed"
+ },
+ {
+ "language_code": "es_es",
+ "string": "Comando a ejecutar. Esto no se puede cambiar"
+ },
+ {
+ "language_code": "de_de",
+ "string": "Befehl zum Ausführen. Dies kann nicht geändert werden"
+ }
+ ]
+ },
+ {
+ "function": "RUN_SCHD",
+ "type": "text",
+ "default_value":"*/30 * * * *",
+ "options": [],
+ "localized": ["name", "description"],
+ "name" : [{
+ "language_code":"en_us",
+ "string" : "Schedule"
+ },
+ {
+ "language_code":"es_es",
+ "string" : "Schedule"
+ },
+ {
+ "language_code":"de_de",
+ "string" : "Schedule"
+ }],
+ "description": [{
+ "language_code":"en_us",
+ "string" : "Only enabled if you select schedule
setting. Make sure you enter the schedule in the correct cron-like format (e.g. validate at crontab.guru). For example entering 0 4 * * *
will run the scan after 4 am in the TIMEZONE
you set above. Will be run NEXT time the time passes."
+ },
+ {
+ "language_code":"es_es",
+ "string" : "Solo está habilitado si selecciona schedule
en la configuración DBCLNP_RUN
. Asegúrese de ingresar la programación en el formato similar a cron correcto (por ejemplo, valide en crontab.guru). Por ejemplo, ingresar 0 4 * * *
ejecutará el escaneo después de las 4 a.m. en el TIMEZONE código> que configuró arriba
. Se ejecutará la PRÓXIMA vez que pase el tiempo."
+ },
+ {
+ "language_code":"de_de",
+ "string" : "Nur aktiviert, wenn Sie schedule
-Einstellung auswählen. Stellen Sie sicher, dass Sie den Zeitplan im richtigen Cron-ähnlichen Format eingeben (z. B. validieren unter crontab.guru). Wenn Sie beispielsweise 0 4 * * *
eingeben, wird der Scan nach 4 Uhr morgens in der TIMEZONE ausgeführt. Code> den Sie oben festgelegt haben
. Wird das NÄCHSTE Mal ausgeführt, wenn die Zeit vergeht."
+ }]
+ },
+ {
+ "function": "RUN_TIMEOUT",
+ "type": "integer",
+ "default_value": 30,
+ "options": [],
+ "localized": ["name", "description"],
+ "name": [
+ {
+ "language_code": "en_us",
+ "string": "Run timeout"
+ },
+ {
+ "language_code": "es_es",
+ "string": "Tiempo límite de ejecución"
+ },
+ {
+ "language_code": "de_de",
+ "string": "Zeitüberschreitung"
+ }
+ ],
+ "description": [
+ {
+ "language_code": "en_us",
+ "string": "Maximum time in seconds to wait for the script to finish. If this time is exceeded the script is aborted."
+ },
+ {
+ "language_code": "es_es",
+ "string": "Tiempo máximo en segundos para esperar a que finalice el script. Si se supera este tiempo, el script se cancela."
+ },
+ {
+ "language_code": "de_de",
+ "string": "Maximale Zeit in Sekunden, die auf den Abschluss des Skripts gewartet werden soll. Bei Überschreitung dieser Zeit wird das Skript abgebrochen."
+ }
+ ]
+ }
+ ],
+ "database_column_definitions":
+ [
+ ]
diff --git a/front/plugins/internet_ip/ignore_plugin b/front/plugins/internet_ip/ignore_plugin
new file mode 100644
index 000000000..e69de29bb
diff --git a/front/plugins/internet_ip/script.py b/front/plugins/internet_ip/script.py
new file mode 100755
index 000000000..d69734e9f
--- /dev/null
+++ b/front/plugins/internet_ip/script.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+# test script by running:
+# /home/pi/pialert/front/plugins/internet_ip/script.py TBD
+import os
+import pathlib
+import argparse
+import sys
+import hashlib
+import csv
+import subprocess
+import re
+import sqlite3
+from io import StringIO
+from datetime import datetime
+from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64
+from logger import mylog, append_line_to_file
+from helper import timeNowTZ, get_internet_IP
+from const import logPath, pialertPath, fullDbPath
+CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
+LOG_FILE = os.path.join(CUR_PATH, 'script.log')
+RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
+def main():
+ mylog('verbose', ['[INTRNT] In script'])
+ parser = argparse.ArgumentParser(description='Check internet connectivity and IP')
+ parser.add_argument('pluginskeephistory', action="store", help="TBC")
+ parser.add_argument('hourstokeepnewdevice', action="store", help="TBC")
+ parser.add_argument('daystokeepevents', action="store", help="TBC")
+ parser.add_argument('pholuskeepdays', action="store", help="TBC")
+ values = parser.parse_args()
+ DDNS_ACTIVE = values.TBC.split('=')[1]
+ DDNS_UPDATE_URL = values.TBC.split('=')[1]
+ DDNS_USER = values.TBC.split('=')[1]
+ DDNS_PASSWORD = values.TBC.split('=')[1]
+ DDNS_DOMAIN = values.TBC.split('=')[1]
+ # Connect to the PiAlert SQLite database
+ conn = sqlite3.connect(fullDbPath)
+ cursor = conn.cursor()
+ # do stuff
+ cursor.execute ("""SELECT from Online_History""") # TODO delete
+ conn.commit()
+ # Close the database connection
+ conn.close()
+ mylog('verbose', ['[INTRNT] Finished '])
+ return 0
+ # Header
+ updateState("Scan: Internet IP")
+ mylog('verbose', ['[INTRNT] Check Internet IP started'])
+ # Get Internet IP
+ mylog('verbose', ['[INTRNT] - Retrieving Internet IP'])
+ internet_IP = get_internet_IP()
+ # TESTING - Force IP
+ # internet_IP = ""
+ # Check result = IP
+ if internet_IP == "" :
+ mylog('none', ['[INTRNT] Error retrieving Internet IP'])
+ mylog('none', ['[INTRNT] Exiting...'])
+ return False
+ mylog('verbose', ['[INTRNT] IP: ', internet_IP])
+ # Get previous stored IP
+ mylog('verbose', ['[INTRNT] Retrieving previous IP:'])
+ previous_IP = get_previous_internet_IP (conn, cursor)
+ mylog('verbose', ['[INTRNT] ', previous_IP])
+ # Check IP Change
+ if internet_IP != previous_IP :
+ mylog('minimal', ['[INTRNT] New internet IP: ', internet_IP])
+ save_new_internet_IP (conn, cursor, internet_IP)
+ else :
+ mylog('verbose', ['[INTRNT] No changes to perform'])
+ # Get Dynamic DNS IP
+ mylog('verbose', ['[DDNS] Retrieving Dynamic DNS IP'])
+ dns_IP = get_dynamic_DNS_IP()
+ # Check Dynamic DNS IP
+ if dns_IP == "" or dns_IP == "" :
+ mylog('none', ['[DDNS] Error retrieving Dynamic DNS IP'])
+ mylog('none', ['[DDNS] ', dns_IP])
+ # Check DNS Change
+ if dns_IP != internet_IP :
+ mylog('none', ['[DDNS] Updating Dynamic DNS IP'])
+ mylog('none', ['[DDNS] ', message])
+ else :
+ mylog('verbose', ['[DDNS] No changes to perform'])
+ else :
+ mylog('verbose', ['[DDNS] Skipping Dynamic DNS update'])
+def save_new_internet_IP (conn, cursor, pNewIP):
+ # Log new IP into logfile
+ append_line_to_file (logPath + '/IP_changes.log',
+ '['+str(timeNowTZ()) +']\t'+ pNewIP +'\n')
+ prevIp = get_previous_internet_IP(conn, cursor)
+ # Save event
+ cursor.execute ("""INSERT INTO Events (eve_MAC, eve_IP, eve_DateTime,
+ eve_EventType, eve_AdditionalInfo,
+ eve_PendingAlertEmail)
+ VALUES ('Internet', ?, ?, 'Internet IP Changed',
+ 'Previous Internet IP: '|| ?, 1) """,
+ (pNewIP, timeNowTZ(), prevIp) )
+ # Save new IP
+ cursor.execute ("""UPDATE Devices SET dev_LastIP = ?
+ WHERE dev_MAC = 'Internet' """,
+ (pNewIP,) )
+ # commit changes
+ conn.commit()
+def get_previous_internet_IP (conn, cursor):
+ previous_IP = ''
+ # get previous internet IP stored in DB
+ cursor.execute ("SELECT dev_LastIP FROM Devices WHERE dev_MAC = 'Internet' ")
+ result = db.sql.fetchone()
+ conn.commit()
+ if result is not None and len(result) > 0 :
+ previous_IP = result[0]
+ # return previous IP
+ return previous_IP
+def get_dynamic_DNS_IP (DDNS_DOMAIN):
+ # Using OpenDNS server
+ # dig_args = ['dig', '+short', DDNS_DOMAIN, '@resolver1.opendns.com']
+ # Using default DNS server
+ dig_args = ['dig', '+short', DDNS_DOMAIN]
+ try:
+ # try runnning a subprocess
+ dig_output = subprocess.check_output (dig_args, universal_newlines=True)
+ except subprocess.CalledProcessError as e:
+ # An error occured, handle it
+ mylog('none', ['[DDNS] ERROR - ', e.output])
+ dig_output = '' # probably no internet
+ # Check result is an IP
+ IP = check_IP_format (dig_output)
+ # Handle invalid response
+ if IP == '':
+ IP = ''
+ return IP
+ try:
+ # try runnning a subprocess
+ # Update Dynamic IP
+ curl_output = subprocess.check_output (['curl', '-s',
+ 'username=' + DDNS_USER +
+ '&password=' + DDNS_PASSWORD +
+ '&hostname=' + DDNS_DOMAIN],
+ universal_newlines=True)
+ except subprocess.CalledProcessError as e:
+ # An error occured, handle it
+ mylog('none', ['[DDNS] ERROR - ',e.output])
+ curl_output = ""
+ return curl_output
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/front/plugins/vendor_update/script.py b/front/plugins/vendor_update/script.py
index 90325cbea..4837f0553 100755
--- a/front/plugins/vendor_update/script.py
+++ b/front/plugins/vendor_update/script.py
@@ -32,7 +32,7 @@ def main():
mylog('verbose', ['[VNDRPDT] In script'])
# Get newest DB
- # update_vendor_database() TODOz
+ update_vendor_database()
# Resolve missing vendors
plugin_objects = Plugin_Objects(RESULT_FILE)
diff --git a/pialert/__main__.py b/pialert/__main__.py
index c4b8d772e..a1b926b84 100755
--- a/pialert/__main__.py
+++ b/pialert/__main__.py
@@ -98,8 +98,7 @@ def main ():
conf.loop_start_time = timeNowTZ()
# TODO fix these
- loop_start_time = conf.loop_start_time # TODO fix
- last_update_vendors = conf.last_update_vendors
+ loop_start_time = conf.loop_start_time # TODO fix
last_version_check = conf.last_version_check
# check if new version is available / only check once an hour
diff --git a/pialert/conf.py b/pialert/conf.py
index baf9fa135..90365b0f2 100755
--- a/pialert/conf.py
+++ b/pialert/conf.py
@@ -6,7 +6,6 @@
# These are global variables, not config items and should not exist !
mySettings = []
mySettingsSQLsafe = []
-debug_force_notification = False
cycle = 1
userSubnets = []
mySchedules = [] # bad solution for global - TO-DO
@@ -23,7 +22,6 @@
startTime = ''
last_internet_IP_scan = ''
last_scan_run = ''
-last_update_vendors = ''
last_version_check = ''
arpscan_devices = []
@@ -31,75 +29,83 @@
mqtt_connected_to_broker = False
mqtt_sensors = []
client = None # mqtt client
-# for notifications
+# -------------------------------------------
# General
-SCAN_SUBNETS = [' --interface=eth1', ' --interface=eth0']
-LOG_LEVEL = 'verbose'
-TIMEZONE = 'Europe/Berlin'
-PIALERT_WEB_PASSWORD = '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92'
-INCLUDED_SECTIONS = ['internet', 'new_devices', 'down_devices', 'events']
-REPORT_DASHBOARD_URL = 'http://pi.alert/'
-DIG_GET_IP_ARG = '-4 myip.opendns.com @resolver1.opendns.com'
-UI_LANG = 'English'
-UI_PRESENCE = ['online', 'offline', 'archived']
+# -------------------------------------------
+SCAN_SUBNETS = [' --interface=eth1', ' --interface=eth0']
+LOG_LEVEL = 'verbose'
+TIMEZONE = 'Europe/Berlin'
+DIG_GET_IP_ARG = '-4 myip.opendns.com @resolver1.opendns.com'
+UI_LANG = 'English'
+UI_PRESENCE = ['online', 'offline', 'archived']
+PIALERT_WEB_PASSWORD = '8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92'
+INCLUDED_SECTIONS = ['internet', 'new_devices', 'down_devices', 'events']
+REPORT_DASHBOARD_URL = 'http://pi.alert/'
+# -------------------------------------------
+# Notification gateways
+# -------------------------------------------
# Email
-SMTP_PORT = 587
-REPORT_TO = 'user@gmail.com'
-REPORT_FROM = 'Pi.Alert '
+SMTP_PORT = 587
+REPORT_TO = 'user@gmail.com'
+REPORT_FROM = 'Pi.Alert '
# Webhooks
# Apprise
-NTFY_HOST ='https://ntfy.sh'
+NTFY_HOST = 'https://ntfy.sh'
-MQTT_PORT = 1883
-# DynDNS
-DDNS_DOMAIN = 'your_domain.freeddns.org'
-DDNS_USER = 'dynu_user'
-DDNS_PASSWORD = 'A0000000B0000000C0000000D0000000'
-DDNS_UPDATE_URL = 'https://api.dynu.com/nic/update?'
+MQTT_PORT = 1883
+# -------------------------------------------
+# Misc
+# -------------------------------------------
-API_CUSTOM_SQL = 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0'
\ No newline at end of file
+API_CUSTOM_SQL = 'SELECT * FROM Devices WHERE dev_PresentLastScan = 0'
+# DynDNS
+DDNS_DOMAIN = 'your_domain.freeddns.org'
+DDNS_USER = 'dynu_user'
+DDNS_PASSWORD = 'A0000000B0000000C0000000D0000000'
+DDNS_UPDATE_URL = 'https://api.dynu.com/nic/update?'
\ No newline at end of file
diff --git a/pialert/const.py b/pialert/const.py
index d66c53c9b..c5eb07710 100755
--- a/pialert/const.py
+++ b/pialert/const.py
@@ -4,10 +4,9 @@
pialertPath = '/home/pi/pialert'
-#pialertPath ='/home/roland/repos/Pi.Alert'
-confPath = "/config/pialert.conf"
-dbPath = '/db/pialert.db'
+confPath = "/config/pialert.conf"
+dbPath = '/db/pialert.db'
pluginsPath = pialertPath + '/front/plugins'
@@ -15,8 +14,8 @@
apiPath = pialertPath + '/front/api/'
fullConfPath = pialertPath + confPath
fullDbPath = pialertPath + dbPath
-vendorsPath6 = '/usr/share/arp-scan/ieee-oui.txt'
-vendorsPath9 = '/usr/share/arp-scan/ieee-iab.txt'
+vendorsPath6 = '/usr/share/arp-scan/ieee-oui.txt'
+vendorsPath9 = '/usr/share/arp-scan/ieee-iab.txt'
diff --git a/pialert/device.py b/pialert/device.py
index 524dccdd6..eeb53f335 100755
--- a/pialert/device.py
+++ b/pialert/device.py
@@ -3,8 +3,7 @@
import conf
import re
-from helper import timeNowTZ, get_setting, get_setting_value,resolve_device_name_dig, resolve_device_name_pholus
-from scanners.internet import check_IP_format, get_internet_IP
+from helper import timeNowTZ, get_setting, get_setting_value,resolve_device_name_dig, resolve_device_name_pholus, check_IP_format, get_internet_IP
from logger import mylog, print_log
from const import vendorsPath6, vendorsPath9
@@ -13,13 +12,9 @@
def save_scanned_devices (db):
sql = db.sql #TO-DO
- cycle = 1 # always 1, only one cycle supported
- # handled by the ARPSCAN plugin
- # handled by the Pi-hole plugin
# Check Internet connectivity
- internet_IP = get_internet_IP( conf.DIG_GET_IP_ARG )
+ internet_IP = get_internet_IP()
# TESTING - Force IP
# internet_IP = ""
if internet_IP != "" :
diff --git a/pialert/helper.py b/pialert/helper.py
index 2d11c440f..c48ded383 100755
--- a/pialert/helper.py
+++ b/pialert/helper.py
@@ -18,10 +18,10 @@
from const import *
from logger import mylog, logResult
+# DateTime
+# Get the current time in the current TimeZone
def timeNowTZ():
if isinstance(conf.TIMEZONE, str):
tz = pytz.timezone(conf.TIMEZONE)
@@ -34,6 +34,8 @@ def timeNow():
return datetime.datetime.now().replace(microsecond=0)
+# App state
# A class to manage the application state and to provide a frontend accessible API point
class app_state_class:
@@ -79,20 +81,14 @@ def isSet(self):
return result
-# Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically.
-class AppStateEncoder(json.JSONEncoder):
- def default(self, obj):
- if hasattr(obj, '__dict__'):
- # If the object has a '__dict__', assume it's an instance of a class
- return obj.__dict__
- return super().default(obj)
+# method to update the state
def updateState(newState, settingsSaved = None, settingsImported = None, showSpinner = False):
state = app_state_class(newState, settingsSaved, settingsImported, showSpinner)
def updateSubnets(scan_subnets):
subnets = []
@@ -109,6 +105,8 @@ def updateSubnets(scan_subnets):
+# File system permission handling
# check RW access of DB and config file
def checkPermissionsOK():
@@ -175,7 +173,7 @@ def initialiseFile(pathToCheck, defaultFile):
mylog('none', ["[Setup] Error copying ("+defaultFile+"). Make sure the app has Read & Write access to " + pathToCheck])
mylog('none', [e.output])
def filePermissions():
# check and initialize pialert.conf
(confR_access, dbR_access) = checkPermissionsOK() # Initial check
@@ -192,162 +190,8 @@ def filePermissions():
-def bytes_to_string(value):
- # if value is of type bytes, convert to string
- if isinstance(value, bytes):
- value = value.decode('utf-8')
- return value
-def if_byte_then_to_str(input):
- if isinstance(input, bytes):
- input = input.decode('utf-8')
- input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input)))
- return input
-def collect_lang_strings(db, json, pref, stringSqlParams):
- for prop in json["localized"]:
- for language_string in json[prop]:
- stringSqlParams.append((str(language_string["language_code"]), str(pref + "_" + prop), str(language_string["string"]), ""))
- return stringSqlParams
-# Creates a JSON object from a DB row
-def row_to_json(names, row):
- rowEntry = {}
- index = 0
- for name in names:
- rowEntry[name]= if_byte_then_to_str(row[name])
- index += 1
- return rowEntry
-def checkIPV4(ip):
- """ Define a function to validate an Ip address
- """
- ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$"
- if(re.search(ipRegex, ip)):
- return True
- else:
- return False
-def isNewVersion(newVersion: bool):
- mylog('debug', [f"[Version check] New version available? {newVersion}"])
- if newVersion == False:
- f = open(pialertPath + '/front/buildtimestamp.txt', 'r')
- buildTimestamp = int(f.read().strip())
- f.close()
- data = ""
- try:
- url = requests.get("https://api.github.com/repos/jokob-sk/Pi.Alert/releases")
- text = url.text
- data = json.loads(text)
- except requests.exceptions.ConnectionError as e:
- mylog('minimal', [" Couldn't check for new release."])
- data = ""
- # make sure we received a valid response and not an API rate limit exceeded message
- if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]:
- dateTimeStr = data[0]["published_at"]
- realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s'))
- if realeaseTimestamp > buildTimestamp + 600:
- mylog('none', ["[Version check] New version of the container available!"])
- newVersion = True
- return newVersion
-def hide_email(email):
- m = email.split('@')
- if len(m) == 2:
- return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}'
- return email
-def removeDuplicateNewLines(text):
- if "\n\n\n" in text:
- return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n"))
- else:
- return text
-def add_json_list (row, list):
- new_row = []
- for column in row :
- column = bytes_to_string(column)
- new_row.append(column)
- list.append(new_row)
- return list
-def sanitize_string(input):
- if isinstance(input, bytes):
- input = input.decode('utf-8')
- value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input)))
- return value
-def generate_mac_links (html, deviceUrl):
- p = re.compile(r'(?:[0-9a-fA-F]:?){12}')
- MACs = re.findall(p, html)
- for mac in MACs:
- html = html.replace('' + mac + ' | ','' + mac + ' | ')
- return html
-def initOrSetParam(db, parID, parValue):
- sql = db.sql
- sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'")
- db.commitDB()
+# File manipulation methods
-class json_struc:
- def __init__(self, jsn, columnNames):
- self.json = jsn
- self.columnNames = columnNames
def get_file_content(path):
@@ -382,16 +226,8 @@ def write_file(pPath, pText):
-class noti_struc:
- def __init__(self, json, text, html):
- self.json = json
- self.text = text
- self.html = html
+# Setting methods
-def isJsonObject(value):
- return isinstance(value, dict)
# Return whole setting touple
def get_setting(key):
@@ -423,6 +259,100 @@ def get_setting_value(key):
return ''
+# IP validation methods
+def checkIPV4(ip):
+ """ Define a function to validate an Ip address
+ """
+ ipRegex = "^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$"
+ if(re.search(ipRegex, ip)):
+ return True
+ else:
+ return False
+def check_IP_format (pIP):
+ # Check IP format
+ IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])'
+ IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')'
+ IP = re.search(IPv4ADDR, pIP)
+ # Return error if not IP
+ if IP is None :
+ return ""
+ # Return IP
+ return IP.group(0)
+def get_internet_IP ():
+ # BUGFIX #46 - curl http://ipv4.icanhazip.com repeatedly is very slow
+ # Using 'dig'
+ dig_args = ['dig', '+short'] + conf.DIG_GET_IP_ARG.strip().split()
+ try:
+ cmd_output = subprocess.check_output (dig_args, universal_newlines=True)
+ except subprocess.CalledProcessError as e:
+ mylog('none', [e.output])
+ cmd_output = '' # no internet
+ # Check result is an IP
+ IP = check_IP_format (cmd_output)
+ # Handle invalid response
+ if IP == '':
+ IP = ''
+ return IP
+def resolve_device_name_dig (pMAC, pIP):
+ newName = ""
+ try :
+ dig_args = ['dig', '+short', '-x', pIP]
+ # Execute command
+ try:
+ # try runnning a subprocess
+ newName = subprocess.check_output (dig_args, universal_newlines=True)
+ except subprocess.CalledProcessError as e:
+ # An error occured, handle it
+ mylog('none', ['[device_name_dig] ', e.output])
+ # newName = "Error - check logs"
+ return -1
+ # Check returns
+ newName = newName.strip()
+ if len(newName) == 0 :
+ return -1
+ # Cleanup
+ newName = cleanResult(newName)
+ if newName == "" or len(newName) == 0:
+ return -1
+ # Return newName
+ return newName
+ # not Found
+ except subprocess.CalledProcessError :
+ return -1
+# DNS record (Pholus/Name resolution) cleanup methods
# Disclaimer - I'm interfacing with a script I didn't write (pholus3.py) so it's possible I'm missing types of answers
# it's also possible the pholus3.py script can be adjusted to provide a better output to interface with it
@@ -498,43 +428,7 @@ def resolve_device_name_pholus (pMAC, pIP, allRes):
return -1
-def resolve_device_name_dig (pMAC, pIP):
- newName = ""
- try :
- dig_args = ['dig', '+short', '-x', pIP]
- # Execute command
- try:
- # try runnning a subprocess
- newName = subprocess.check_output (dig_args, universal_newlines=True)
- except subprocess.CalledProcessError as e:
- # An error occured, handle it
- mylog('none', ['[device_name_dig] ', e.output])
- # newName = "Error - check logs"
- return -1
- # Check returns
- newName = newName.strip()
- if len(newName) == 0 :
- return -1
- # Cleanup
- newName = cleanResult(newName)
- if newName == "" or len(newName) == 0:
- return -1
- # Return newName
- return newName
- # not Found
- except subprocess.CalledProcessError :
- return -1
def cleanResult(str):
@@ -552,4 +446,178 @@ def cleanResult(str):
if str.endswith('.'):
str = str[:-1]
- return str
\ No newline at end of file
+ return str
+# String manipulation methods
+def bytes_to_string(value):
+ # if value is of type bytes, convert to string
+ if isinstance(value, bytes):
+ value = value.decode('utf-8')
+ return value
+def if_byte_then_to_str(input):
+ if isinstance(input, bytes):
+ input = input.decode('utf-8')
+ input = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input)))
+ return input
+def hide_email(email):
+ m = email.split('@')
+ if len(m) == 2:
+ return f'{m[0][0]}{"*"*(len(m[0])-2)}{m[0][-1] if len(m[0]) > 1 else ""}@{m[1]}'
+ return email
+def removeDuplicateNewLines(text):
+ if "\n\n\n" in text:
+ return removeDuplicateNewLines(text.replace("\n\n\n", "\n\n"))
+ else:
+ return text
+def sanitize_string(input):
+ if isinstance(input, bytes):
+ input = input.decode('utf-8')
+ value = bytes_to_string(re.sub('[^a-zA-Z0-9-_\s]', '', str(input)))
+ return value
+def generate_mac_links (html, deviceUrl):
+ p = re.compile(r'(?:[0-9a-fA-F]:?){12}')
+ MACs = re.findall(p, html)
+ for mac in MACs:
+ html = html.replace('' + mac + ' | ','' + mac + ' | ')
+ return html
+# JSON methods
+def isJsonObject(value):
+ return isinstance(value, dict)
+def add_json_list (row, list):
+ new_row = []
+ for column in row :
+ column = bytes_to_string(column)
+ new_row.append(column)
+ list.append(new_row)
+ return list
+# Checks if the object has a __dict__ attribute. If it does, it assumes that it's an instance of a class and serializes its attributes dynamically.
+class AppStateEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if hasattr(obj, '__dict__'):
+ # If the object has a '__dict__', assume it's an instance of a class
+ return obj.__dict__
+ return super().default(obj)
+# Creates a JSON object from a DB row
+def row_to_json(names, row):
+ rowEntry = {}
+ index = 0
+ for name in names:
+ rowEntry[name]= if_byte_then_to_str(row[name])
+ index += 1
+ return rowEntry
+# Get language strings from plugin JSON
+def collect_lang_strings(json, pref, stringSqlParams):
+ for prop in json["localized"]:
+ for language_string in json[prop]:
+ stringSqlParams.append((str(language_string["language_code"]), str(pref + "_" + prop), str(language_string["string"]), ""))
+ return stringSqlParams
+# Misc
+def isNewVersion(newVersion: bool):
+ mylog('debug', [f"[Version check] New version available? {newVersion}"])
+ if newVersion == False:
+ f = open(pialertPath + '/front/buildtimestamp.txt', 'r')
+ buildTimestamp = int(f.read().strip())
+ f.close()
+ data = ""
+ try:
+ url = requests.get("https://api.github.com/repos/jokob-sk/Pi.Alert/releases")
+ text = url.text
+ data = json.loads(text)
+ except requests.exceptions.ConnectionError as e:
+ mylog('minimal', [" Couldn't check for new release."])
+ data = ""
+ # make sure we received a valid response and not an API rate limit exceeded message
+ if data != "" and len(data) > 0 and isinstance(data, list) and "published_at" in data[0]:
+ dateTimeStr = data[0]["published_at"]
+ realeaseTimestamp = int(datetime.datetime.strptime(dateTimeStr, '%Y-%m-%dT%H:%M:%SZ').strftime('%s'))
+ if realeaseTimestamp > buildTimestamp + 600:
+ mylog('none', ["[Version check] New version of the container available!"])
+ newVersion = True
+ return newVersion
+def initOrSetParam(db, parID, parValue):
+ sql = db.sql
+ sql.execute ("INSERT INTO Parameters(par_ID, par_Value) VALUES('"+str(parID)+"', '"+str(parValue)+"') ON CONFLICT(par_ID) DO UPDATE SET par_Value='"+str(parValue)+"' where par_ID='"+str(parID)+"'")
+ db.commitDB()
+class json_struc:
+ def __init__(self, jsn, columnNames):
+ self.json = jsn
+ self.columnNames = columnNames
+class noti_struc:
+ def __init__(self, json, text, html):
+ self.json = json
+ self.text = text
+ self.html = html
\ No newline at end of file
diff --git a/pialert/initialise.py b/pialert/initialise.py
index b33ef74ad..27d45e200 100755
--- a/pialert/initialise.py
+++ b/pialert/initialise.py
@@ -78,6 +78,7 @@ def importConfigs (db):
# remove all plugin langauge strings
sql.execute("DELETE FROM Plugins_Language_Strings;")
+ db.commitDB()
mylog('debug', ['[Import Config] importing config file'])
conf.mySettings = [] # reset settings
@@ -169,8 +170,6 @@ def importConfigs (db):
conf.time_started = datetime.datetime.now(conf.tz)
conf.cycle = ""
conf.plugins_once_run = False
- #cron_instance = Cron()
# timestamps of last execution times
conf.startTime = conf.time_started
@@ -178,8 +177,7 @@ def importConfigs (db):
# set these times to the past to force the first run
conf.last_internet_IP_scan = now_minus_24h
- conf.last_scan_run = now_minus_24h
- conf.last_update_vendors = conf.time_started - datetime.timedelta(days = 6) # update vendors 24h after first run and then once a week
+ conf.last_scan_run = now_minus_24h
conf.last_version_check = now_minus_24h
# TODO cleanup later ----------------------------------------------------------------------------------
@@ -211,7 +209,7 @@ def importConfigs (db):
stringSqlParams = []
# collect plugin level language strings
- stringSqlParams = collect_lang_strings(db, plugin, pref, stringSqlParams)
+ stringSqlParams = collect_lang_strings(plugin, pref, stringSqlParams)
for set in plugin["settings"]:
setFunction = set["function"]
@@ -242,12 +240,12 @@ def importConfigs (db):
# Collect settings related language strings
# Creates an entry with key, for example ARPSCAN_CMD_name
- stringSqlParams = collect_lang_strings(db, set, pref + "_" + set["function"], stringSqlParams)
+ stringSqlParams = collect_lang_strings(set, pref + "_" + set["function"], stringSqlParams)
# Collect column related language strings
for clmn in plugin.get('database_column_definitions', []):
# Creates an entry with key, for example ARPSCAN_Object_PrimaryID_name
- stringSqlParams = collect_lang_strings(db, clmn, pref + "_" + clmn.get("column", ""), stringSqlParams)
+ stringSqlParams = collect_lang_strings(clmn, pref + "_" + clmn.get("column", ""), stringSqlParams)
# bulk-import language strings
sql.executemany ("""INSERT INTO Plugins_Language_Strings ("Language_Code", "String_Key", "String_Value", "Extra") VALUES (?, ?, ?, ?)""", stringSqlParams )
diff --git a/pialert/reporting.py b/pialert/reporting.py
index e9777e2cd..488dc987e 100755
--- a/pialert/reporting.py
+++ b/pialert/reporting.py
@@ -283,7 +283,7 @@ def send_notifications (db):
write_file (logPath + '/report_output.html', mail_html)
# Send Mail
- if json_internet != [] or json_new_devices != [] or json_down_devices != [] or json_events != [] or json_ports != [] or conf.debug_force_notification or plugins_report:
+ if json_internet != [] or json_new_devices != [] or json_down_devices != [] or json_events != [] or json_ports != [] or plugins_report:
mylog('none', ['[Notification] Changes detected, sending reports'])
diff --git a/pialert/scanners/internet.py b/pialert/scanners/internet.py
index e5eae0d46..6e85e4d20 100755
--- a/pialert/scanners/internet.py
+++ b/pialert/scanners/internet.py
@@ -6,15 +6,11 @@
# pialert modules
import conf
-from helper import timeNowTZ, updateState
+from helper import timeNowTZ, updateState, check_IP_format, get_internet_IP
from logger import append_line_to_file, mylog
from const import logPath
-# need to find a better way to deal with settings !
@@ -26,7 +22,7 @@ def check_internet_IP ( db ):
# Get Internet IP
mylog('verbose', ['[Internet IP] - Retrieving Internet IP'])
- internet_IP = get_internet_IP(conf.DIG_GET_IP_ARG)
+ internet_IP = get_internet_IP()
# TESTING - Force IP
# internet_IP = ""
@@ -72,25 +68,6 @@ def check_internet_IP ( db ):
-def get_internet_IP (DIG_GET_IP_ARG):
- # BUGFIX #46 - curl http://ipv4.icanhazip.com repeatedly is very slow
- # Using 'dig'
- dig_args = ['dig', '+short'] + DIG_GET_IP_ARG.strip().split()
- try:
- cmd_output = subprocess.check_output (dig_args, universal_newlines=True)
- except subprocess.CalledProcessError as e:
- mylog('none', [e.output])
- cmd_output = '' # no internet
- # Check result is an IP
- IP = check_IP_format (cmd_output)
- # Handle invalid response
- if IP == '':
- IP = ''
- return IP
def get_previous_internet_IP (db):
@@ -134,21 +111,6 @@ def save_new_internet_IP (db, pNewIP):
# commit changes
-def check_IP_format (pIP):
- # Check IP format
- IPv4SEG = r'(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])'
- IPv4ADDR = r'(?:(?:' + IPv4SEG + r'\.){3,3}' + IPv4SEG + r')'
- IP = re.search(IPv4ADDR, pIP)
- # Return error if not IP
- if IP is None :
- return ""
- # Return IP
- return IP.group(0)
def get_dynamic_DNS_IP ():