From 076242f7a7b1bd0de14f30d06764ac87125faea4 Mon Sep 17 00:00:00 2001 From: mal Date: Sat, 14 May 2022 05:36:31 +0000 Subject: [PATCH] safety: rework --- sopel/modules/safety.py | 322 ++++++++++++++++++++++++++-------------- sopel/modules/url.py | 18 ++- 2 files changed, 223 insertions(+), 117 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 9537eea2ad..2311a6f7b7 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -7,16 +7,17 @@ """ from __future__ import annotations +from base64 import urlsafe_b64encode import json import logging import os.path import re import threading import time +from typing import Dict, Optional from urllib.parse import urlparse from urllib.request import urlretrieve - import requests from sopel import formatting, plugin, tools @@ -26,56 +27,107 @@ LOGGER = logging.getLogger(__name__) PLUGIN_OUTPUT_PREFIX = '[safety] ' -vt_base_api_url = 'https://www.virustotal.com/vtapi/v2/url/' -malware_domains = set() +SAFETY_MODES = ["off", "local", "local strict", "on", "strict"] +VT_API_URL = b"https://www.virustotal.com/api/v3/urls" known_good = [] cache_limit = 512 +requests_session = requests.Session() class SafetySection(types.StaticSection): - enabled_by_default = types.BooleanAttribute('enabled_by_default', default=True) - """Whether to enable URL safety in all channels where it isn't explicitly disabled.""" + enabled_by_default = types.ValidatedAttribute("enabled_by_default") + """Deprecated: Sets default_mode to "on".""" + default_mode = types.ValidatedAttribute("default_mode") + """Which mode to use in channels without a mode set.""" known_good = types.ListAttribute('known_good') """List of "known good" domains to ignore.""" vt_api_key = types.ValidatedAttribute('vt_api_key') """Optional VirusTotal API key (improves malicious URL detection).""" + domain_blocklist_url = types.ValidatedAttribute('domain_blocklist_url') + """Optional hosts-file formatted domain blocklist to use instead of StevenBlack's.""" -def configure(config): +def configure(settings): """ | name | example | purpose | | ---- | ------- | ------- | - | enabled\\_by\\_default | True | Enable URL safety in all channels where it isn't explicitly disabled. | - | known\\_good | sopel.chat,dftba.net | List of "known good" domains to ignore. | + | default\\_mode | on | Which mode to use in channels without a mode set. | + | known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to ignore. | | vt\\_api\\_key | 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef | Optional VirusTotal API key to improve malicious URL detection | + | domain\\_blocklist\\_url | https://example.com/bad-hosts.txt | Optional hosts-file formatted domain blocklist to use instead of StevenBlack's. | """ - config.define_section('safety', SafetySection) - config.safety.configure_setting( - 'enabled_by_default', - "Enable URL safety in channels that don't specifically disable it?", + settings.define_section("safety", SafetySection) + settings.safety.configure_setting( + "default_mode", + ( + "Which mode should be used in channels that haven't specifically set one?" + "\n({})".format("/".join(SAFETY_MODES)) + ), + default="on", ) - config.safety.configure_setting( + settings.safety.configure_setting( 'known_good', - 'Enter any domains to whitelist', + "Enter any domains to allowlist", ) - config.safety.configure_setting( + settings.safety.configure_setting( 'vt_api_key', "Optionally, enter a VirusTotal API key to improve malicious URL " - "protection.\nOtherwise, only the StevenBlack list will be used." + "protection.\nOtherwise, only the configured hosts list will be used.", + ) + settings.safety.configure_setting( + 'domain_blocklist_url', + "Optionally, provide the URL for a hosts-file formatted domain " + "blocklist to use instead of StevenBlack's.", ) def setup(bot): - bot.config.define_section('safety', SafetySection) + bot.settings.define_section("safety", SafetySection) + + if bot.settings.safety.default_mode is None: + bot.settings.safety.default_mode = "on" + # migrate from enabled_by_default to default_mode + if bot.settings.safety.enabled_by_default is not None: + if bot.settings.safety.enabled_by_default.lower() not in [ + "1", + "enable", + "enabled", + "on", + "true", + "y", + "yes", + ]: + bot.settings.safety.default_mode = "off" + bot.settings.safety.enabled_by_default = None + LOGGER.info( + "config: enabled_by_default is deprecated, please use default_mode=%s", + bot.settings.safety.default_mode, + ) + + if bot.settings.safety.vt_api_key is not None: + requests_session.headers = { + "x-apikey": bot.settings.safety.vt_api_key, + } if 'safety_cache' not in bot.memory: bot.memory['safety_cache'] = tools.SopelMemory() if 'safety_cache_lock' not in bot.memory: bot.memory['safety_cache_lock'] = threading.Lock() - for item in bot.config.safety.known_good: + for item in bot.settings.safety.known_good: known_good.append(re.compile(item, re.I)) - old_file = os.path.join(bot.config.homedir, 'malwaredomains.txt') + update_local_cache(bot, init=True) + + +def update_local_cache(bot, init: bool = False): + """Download the current malware domain list and load it into memory. + + :param init: Load the file even if it's unchanged + """ + + malware_domains = set() + + old_file = os.path.join(bot.settings.homedir, "malwaredomains.txt") if os.path.exists(old_file) and os.path.isfile(old_file): LOGGER.info('Removing old malwaredomains file from %s', old_file) try: @@ -85,13 +137,18 @@ def setup(bot): # Python on Windows throws an exception if the file is in use LOGGER.info('Could not delete %s: %s', old_file, str(err)) - loc = os.path.join(bot.config.homedir, 'unsafedomains.txt') - if os.path.isfile(loc): - if os.path.getmtime(loc) < time.time() - 24 * 60 * 60: - # File exists but older than one day — update it - _download_domain_list(loc) - else: - _download_domain_list(loc) + loc = os.path.join(bot.settings.homedir, "unsafedomains.txt") + if not os.path.isfile(loc) or os.path.getmtime(loc) < time.time() - 24 * 60 * 60: + # File doesn't exist or is older than one day — update it + url = bot.settings.safety.domain_blocklist_url + if url is None or not url.startswith("http"): + url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts" + LOGGER.info("Downloading malicious domain list from %s", url) + # TODO: Can we use a cache header to avoid the download if it's unmodified? + urlretrieve(url, loc) + elif not init: + return + with open(loc, 'r') as f: for line in f: clean_line = str(line).strip().lower() @@ -110,104 +167,129 @@ def setup(bot): # only publicly routable domains matter; skip loopback/link-local stuff malware_domains.add(domain) + bot.memory["safety_cache_local"] = malware_domains + def shutdown(bot): bot.memory.pop('safety_cache', None) + bot.memory.pop('safety_cache_local', None) bot.memory.pop('safety_cache_lock', None) -def _download_domain_list(path): - url = 'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts' - LOGGER.info('Downloading malicious domain list from %s', url) - urlretrieve(url, path) - - @plugin.rule(r'(?u).*(https?://\S+).*') @plugin.priority('high') @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) def url_handler(bot, trigger): """Checks for malicious URLs""" - check = True # Enable URL checking - strict = False # Strict mode: kick on malicious URL - positives = 0 # Number of engines saying it's malicious - total = 0 # Number of total engines - use_vt = True # Use VirusTotal - check = bot.config.safety.enabled_by_default - if check is None: - # If not set, assume default - check = True - # DB overrides config: - setting = bot.db.get_channel_value(trigger.sender, 'safety') - if setting is not None: - if setting == 'off': - return # Not checking - elif setting in ['on', 'strict', 'local', 'local strict']: - check = True - if setting == 'strict' or setting == 'local strict': - strict = True - if setting == 'local' or setting == 'local strict': - use_vt = False - - if not check: - return # Not overridden by DB, configured default off + mode = bot.db.get_channel_value( + trigger.sender, + "safety", + bot.settings.safety.default_mode, + ) + if mode == "off": + return + local_only = "local" in mode or bot.settings.safety.vt_api_key is None + strict = "strict" in mode - try: - netloc = urlparse(trigger.group(1)).netloc - except ValueError: - return # Invalid IPv6 URL + for url in tools.web.search_urls(trigger): + safe_url = "hxx" + url[3:] + + positives = 0 # Number of engines saying it's malicious + total = 0 # Number of total engines - if any(regex.search(netloc) for regex in known_good): - return # Whitelisted + try: + netloc = urlparse(url).netloc.lower() + except ValueError: + pass # Invalid address + else: + if any(regex.search(netloc) for regex in known_good): + continue # explicitly allowed + + if netloc in bot.memory["safety_cache_local"]: + LOGGER.debug("[local] domain in blocklist: %r", netloc) + positives += 1 + total += 1 + + result = virustotal_lookup(bot, url, local_only=local_only) + if result: + positives += result["positives"] + total += result["total"] + + if positives >= 1: + # Possibly malicious URL detected! + LOGGER.info( + "Possibly malicious link (%s/%s) posted in %s by %s: %r", + positives, + total, + trigger.sender, + trigger.nick, + safe_url, + ) + bot.say( + "{} {} of {} engine{} flagged a link {} posted as malicious".format( + formatting.bold(formatting.color("WARNING:", "red")), + positives, + total, + "" if total == 1 else "s", + formatting.bold(trigger.nick), + ) + ) + if strict: + bot.kick(trigger.nick, trigger.sender, 'Posted a malicious link') + + +def virustotal_lookup(bot, url: str, local_only: bool = False) -> Optional[Dict]: + """Check VirusTotal for flags on a URL as malicious. + + :param url: The URL to look up + :param local_only: If set, only check cache, do not make a new request. + :returns: A dict containing information about findings, or None if not found. + """ + safe_url = "hxx" + url[3:] - apikey = bot.config.safety.vt_api_key + if url in bot.memory["safety_cache"]: + LOGGER.debug("[VirusTotal] Using cached data for %r", safe_url) + return bot.memory["safety_cache"].get(url) + if local_only: + return None + + LOGGER.debug("[VirusTotal] Looking up %r", safe_url) + url_id = urlsafe_b64encode(url.encode("utf-8")).rstrip(b"=") try: - if apikey is not None and use_vt: - payload = {'resource': str(trigger), - 'apikey': apikey, - 'scan': '1'} - - if trigger not in bot.memory['safety_cache']: - r = requests.post(vt_base_api_url + 'report', data=payload) - r.raise_for_status() - result = r.json() - fetched = time.time() - if all(k in result for k in ['positives', 'total']): - # cache result only if it contains a scan report - # TODO: handle checking back for results from queued scans - data = {'positives': result['positives'], - 'total': result['total'], - 'fetched': fetched} - bot.memory['safety_cache'][trigger] = data - if len(bot.memory['safety_cache']) >= (2 * cache_limit): - _clean_cache(bot) - else: - LOGGER.debug('using cache') - result = bot.memory['safety_cache'][trigger] - positives = result.get('positives', 0) - total = result.get('total', 0) + r = requests_session.get(VT_API_URL + b"/" + url_id) + if r.status_code == 404: + # Not analyzed - submit new + LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url) + # TODO: handle checking back for results from queued scans + r = requests_session.post(VT_API_URL, data={"url": url}) + return None + r.raise_for_status() + vt_data = r.json() except requests.exceptions.RequestException: # Ignoring exceptions with VT so domain list will always work - LOGGER.debug('[VirusTotal] Error obtaining response.', exc_info=True) + LOGGER.debug( + "[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True + ) except json.JSONDecodeError: # Ignoring exceptions with VT so domain list will always work - LOGGER.debug('[VirusTotal] Malformed response (invalid JSON).', exc_info=True) - - if str(netloc).lower() in malware_domains: - positives += 1 - total += 1 - - if positives >= 1: - # Possibly malicious URL detected! - confidence = '{}%'.format(round((positives / total) * 100)) - msg = ( - 'link posted by %s is possibly malicious ' - % formatting.bold(trigger.nick) + LOGGER.debug( + "[VirusTotal] Malformed response (invalid JSON) for %r", + safe_url, + exc_info=True, ) - msg += '(confidence %s - %s/%s)' % (confidence, positives, total) - warning = formatting.bold(formatting.color('WARNING:', 'red')) - bot.say(warning + ' ' + msg) - if strict: - bot.kick(trigger.nick, trigger.sender, 'Posted a malicious link') + fetched = time.time() + last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"] + # Only count strong opinions (ignore suspicious/timeout/undetected) + result = { + "positives": last_analysis["malicious"], + "total": last_analysis["malicious"] + last_analysis["harmless"], + "fetched": fetched, + "virustotal_data": vt_data, + } + bot.memory['safety_cache'][url] = result + if len(bot.memory['safety_cache']) >= (2 * cache_limit): + _clean_cache(bot) + return result @plugin.command('safety') @@ -217,15 +299,30 @@ def toggle_safety(bot, trigger): if not trigger.admin and bot.channels[trigger.sender].privileges[trigger.nick] < plugin.OP: bot.reply('Only channel operators can change safety settings') return - allowed_states = ['strict', 'on', 'off', 'local', 'local strict'] - if not trigger.group(2) or trigger.group(2).lower() not in allowed_states: - options = ' / '.join(allowed_states) - bot.reply('Available options: %s' % options) + + new_mode = None + if trigger.group(2): + new_mode = trigger.group(2).lower() + + if not new_mode or (new_mode != "default" and new_mode not in SAFETY_MODES): + bot.reply( + "Current mode: {}. Available modes: {}, or default ({})".format( + bot.db.get_channel_value( + trigger.sender, + "safety", + "default", + ), + ", ".join(SAFETY_MODES), + bot.settings.safety.default_mode, + ) + ) return - channel = trigger.sender.lower() - bot.db.set_channel_value(channel, 'safety', trigger.group(2).lower()) - bot.say('Safety is now set to "%s" on this channel' % trigger.group(2)) + if new_mode == "default": + bot.db.delete_channel_value(trigger.sender, "safety") + else: + bot.db.set_channel_value(trigger.sender, "safety", new_mode) + bot.say('Safety is now set to "%s" for this channel' % new_mode) # Clean the cache every day @@ -233,6 +330,9 @@ def toggle_safety(bot, trigger): @plugin.interval(24 * 60 * 60) def _clean_cache(bot): """Cleans up old entries in URL safety cache.""" + + update_local_cache(bot) + if bot.memory['safety_cache_lock'].acquire(False): LOGGER.info('Starting safety cache cleanup...') try: diff --git a/sopel/modules/url.py b/sopel/modules/url.py index 4825b00166..115bcd9df6 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -303,14 +303,20 @@ def title_auto(bot, trigger): if re.match(bot.config.core.prefix + r'\S+', trigger): return - # Avoid fetching known malicious links - if 'safety_cache' in bot.memory and trigger in bot.memory['safety_cache']: - if bot.memory['safety_cache'][trigger]['positives'] > 1: - return - - urls = web.search_urls( + unchecked_urls = web.search_urls( trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) + urls = [] + for url in unchecked_urls: + # Avoid fetching known malicious links + if "safety_cache" in bot.memory and url in bot.memory["safety_cache"]: + if bot.memory["safety_cache"][url]["positives"] > 0: + continue + netloc = urlparse(url).netloc.lower() + if "safety_cache_local" in bot.memory and netloc in bot.memory["safety_cache_local"]: + continue + urls.append(url) + for url, title, domain, tinyurl in process_urls(bot, trigger, urls): message = '%s | %s' % (title, domain) if tinyurl: