Skip to content

Commit

Permalink
Merge pull request #2285 from half-duplex/url-private-safety
Browse files Browse the repository at this point in the history
url: improve private IP protection
  • Loading branch information
dgw committed Oct 13, 2024
2 parents f3d3704 + acff99a commit 065165a
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 88 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies = [
"xmltodict>=0.12,<0.14",
"pytz",
"requests>=2.24.0,<3.0.0",
"dnspython<3.0",
"sqlalchemy>=1.4,<1.5",
"importlib_metadata>=3.6",
"packaging>=23.2",
Expand Down
218 changes: 131 additions & 87 deletions sopel/builtins/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@
"""
from __future__ import annotations

from email.message import EmailMessage
from ipaddress import ip_address
import logging
import re
from socket import getaddrinfo, IPPROTO_TCP
from typing import NamedTuple, Optional, TYPE_CHECKING
from urllib.parse import urlparse

import dns.resolver
import requests
from urllib3.exceptions import LocationValueError # type: ignore[import]

from sopel import plugin, privileges, tools
from sopel.config import types
from sopel.tools import web

if TYPE_CHECKING:
from collections.abc import Generator
from typing import Iterable

from sopel.bot import Sopel, SopelWrapper
from sopel.config import Config
from sopel.trigger import Trigger
Expand Down Expand Up @@ -73,7 +75,14 @@ class UrlSection(types.StaticSection):
"""If greater than 0, the title fetcher will include a TinyURL version of links longer than this many characters."""
enable_private_resolution = types.BooleanAttribute(
'enable_private_resolution', default=False)
"""Enable requests to private and local network IP addresses"""
"""Allow all requests to private and loopback networks.
If disabled (the default), obvious attempts to load pages from loopback and
private IP addresses will be blocked. If this matters for your security you
must use additional protections like a firewall and CSRF tokens, since an
attacker can change which IP address a domain name refers to between when
Sopel checks it and when the HTTP request is made.
"""


def configure(config: Config):
Expand All @@ -84,7 +93,7 @@ def configure(config: Config):
| exclude | https?://git\\\\.io/.* | A list of regular expressions for URLs for which the title should not be shown. |
| exclusion\\_char | ! | A character (or string) which, when immediately preceding a URL, will stop the URL's title from being shown. |
| shorten\\_url\\_length | 72 | If greater than 0, the title fetcher will include a TinyURL version of links longer than this many characters. |
| enable\\_private\\_resolution | False | Enable requests to private and local network IP addresses. |
| enable\\_private\\_resolution | False | Allow all requests to private IP addresses. Leaving this disabled only blocks obvious attempts, use a firewall! |
"""
config.define_section('url', UrlSection)
config.url.configure_setting(
Expand All @@ -107,7 +116,7 @@ def configure(config: Config):
)
config.url.configure_setting(
'enable_private_resolution',
'Enable requests to private and local network IP addresses?'
'Allow all requests to private (local network) IP addresses?'
)


Expand Down Expand Up @@ -286,7 +295,7 @@ def title_command(bot: SopelWrapper, trigger: Trigger):
urls = [bot.memory["last_seen_url"][trigger.sender]]
else:
# needs to be a list so len() can be checked later
urls = list(web.search_urls(trigger))
urls = list(tools.web.search_urls(trigger))

for url, title, domain, tinyurl, ignored in process_urls(
bot, trigger, urls, requested=True
Expand Down Expand Up @@ -332,21 +341,9 @@ def title_auto(bot: SopelWrapper, trigger: Trigger):
if re.match(bot.config.core.prefix + r'\S+', trigger):
return

unchecked_urls = web.search_urls(
urls = tools.web.search_urls(
trigger, exclusion_char=bot.config.url.exclusion_char, clean=True)

urls = []
safety_cache = bot.memory.get("safety_cache", {})
safety_cache_local = bot.memory.get("safety_cache_local", {})
for url in unchecked_urls:
# Avoid fetching known malicious links
if url in safety_cache and safety_cache[url]["positives"] > 0:
continue
parsed = urlparse(url)
if not parsed.hostname or parsed.hostname.lower() in safety_cache_local:
continue
urls.append(url)

for url, title, domain, tinyurl, ignored in process_urls(bot, trigger, urls):
if not ignored:
message = '%s | %s' % (title, domain)
Expand Down Expand Up @@ -416,63 +413,35 @@ def process_urls(
if not requested and url.startswith(bot.config.url.exclusion_char):
continue

parsed_url = urlparse(url)

if check_callbacks(bot, url, use_excludes=not requested):
# URL matches a callback OR is excluded, ignore
yield URLInfo(url, None, None, None, True)
continue

# Prevent private addresses from being queried if enable_private_resolution is False
# FIXME: This does nothing when an attacker knows how to host a 302
# FIXME: This whole concept has a TOCTOU issue
if not bot.config.url.enable_private_resolution:
if not parsed_url.hostname:
# URL like file:///path is a valid local path (i.e. private)
LOGGER.debug("Ignoring private URL: %s", url)
continue

try:
ips = [ip_address(parsed_url.hostname)]
except ValueError:
# Extra try/except here in case the DNS resolution fails, see #2348
try:
ips = [
ip_address(ip.to_text())
for ip in dns.resolver.resolve(parsed_url.hostname)
]
except Exception as exc:
LOGGER.debug(
"Cannot resolve hostname %s, ignoring URL %s"
" (exception was: %r)",
parsed_url.hostname,
url,
exc,
)
continue

private = False
for ip in ips:
if ip.is_private or ip.is_loopback:
private = True
break
if private:
LOGGER.debug("Ignoring private URL: %s", url)
continue

# Call the URL to get a title, if possible
title = find_title(url)
if not title:
unsafe_urls = [
url
for url, data in bot.memory.get("safety_cache", {}).items()
if data.get("positives")
]
title_results = find_title(
url,
allow_local=bot.config.url.enable_private_resolution,
unsafe_urls=unsafe_urls,
unsafe_domains=bot.memory.get("safety_cache_local", set()),
)
if not title_results:
# No title found: don't handle this URL
LOGGER.debug('No title found; ignoring URL: %s', url)
continue
title, final_hostname = title_results

# If the URL is over bot.config.url.shorten_url_length, shorten the URL
tinyurl = None
if (shorten_url_length > 0) and (len(url) > shorten_url_length):
tinyurl = get_or_create_shorturl(bot, url)

yield URLInfo(url, title, parsed_url.hostname, tinyurl, False)
yield URLInfo(url, title, final_hostname, tinyurl, False)


def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool:
Expand Down Expand Up @@ -510,32 +479,107 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b
)


def find_title(url: str, verify: bool = True) -> Optional[str]:
"""Return the title for the given URL.
def find_title(
url: str,
verify: bool = True,
allow_local: bool = False,
unsafe_urls: Iterable[str] = [],
unsafe_domains: Iterable[str] = [],
) -> Optional[tuple[str, str]]:
"""Fetch the title for the given URL.
:param verify: Whether to require a valid certificate when using https
:param allow_local: Allow requests to non-global addresses (RFC1918, etc.)
:param unsafe_urls: An iterable of URLs to consider malicious and ignore
:param unsafe_domains: An iterable of domains to consider malicious and ignore
:return: A tuple of the (title, final_hostname) that were found, or None
"""
try:
response = requests.get(url, stream=True, verify=verify,
headers=DEFAULT_HEADERS)
raw_content = b''
for byte in response.iter_content(chunk_size=512):
raw_content += byte
if b'</title>' in raw_content or len(raw_content) > MAX_BYTES:
break
content = raw_content.decode('utf-8', errors='ignore')
# Need to close the connection because we have not read all
# the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
original_url = url
redirects_left = 5
session = requests.Session()
session.headers = dict(DEFAULT_HEADERS)
while redirects_left > 0:
redirects_left -= 1
parsed_url = urlparse(url)
if not parsed_url.hostname:
return None

# Avoid fetching known malicious links
if url in unsafe_urls:
LOGGER.debug("Ignoring unsafe URL: %r", url)
return None
if parsed_url.hostname.lower() in unsafe_domains:
LOGGER.debug("Ignoring unsafe domain: %r", url)
return None

# Prevent private addresses from being queried
try:
# If link is to an IP
ips = [ip_address(parsed_url.hostname)]
except ValueError: # Nope, hostname
try:
# getaddrinfo instead of dns.resolver so we use normal OS
# name resolution, including hosts files.
addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP)
ips = [ip_address(info[4][0]) for info in addr_info]
except Exception as e:
LOGGER.debug("Failed to get IPs for %r: %s", url, e)
return None

# is_global excludes RFC1918, loopback, link-local, and v6 equivalents
if not allow_local and not all(ip.is_global for ip in ips):
LOGGER.debug(
"Ignoring private URL %r%s which resolved to %s",
url,
"" if url == original_url else " (redirected from %r)" % original_url,
", ".join([str(ip) for ip in ips]),
)
return None

try:
response = session.get(
url,
stream=True,
verify=verify,
allow_redirects=False,
)
if response.is_redirect:
LOGGER.debug(
"URL %r redirected to %r", url, response.headers.get("Location")
)
if "Location" not in response.headers:
return None
url = response.headers["Location"]
continue

content_bytes = b''
for chunk in response.iter_content(chunk_size=512):
content_bytes += chunk
if b"</title>" in content_bytes or len(content_bytes) > MAX_BYTES:
break

encoding = None
if "Content-Type" in response.headers:
msg = EmailMessage()
msg["Content-Type"] = response.headers["Content-Type"]
encoding = msg.get_content_charset()
content = content_bytes.decode(encoding or "utf-8", errors="ignore")

# Need to close the connection because we haven't read all the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
return None
break
else:
LOGGER.debug("Redirects exhausted for %r", original_url)
return None

# Some cleanup that I don't really grok, but was in the original, so
Expand All @@ -548,12 +592,12 @@ def find_title(url: str, verify: bool = True) -> Optional[str]:
if start == -1 or end == -1:
return None

title = web.decode(content[start + 7:end])
title = tools.web.decode(content[start + 7:end])
title = title.strip()[:200]

title = ' '.join(title.split()) # cleanly remove multiple spaces

return title or None
return (title, parsed_url.hostname)


def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]:
Expand All @@ -580,7 +624,7 @@ def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]:
def get_tinyurl(url: str) -> Optional[str]:
"""Returns a shortened tinyURL link of the URL"""
base_url = "https://tinyurl.com/api-create.php"
tinyurl = "%s?%s" % (base_url, web.urlencode({'url': url}))
tinyurl = "%s?%s" % (base_url, tools.web.urlencode({'url': url}))
try:
res = requests.get(tinyurl)
res.raise_for_status()
Expand Down
11 changes: 11 additions & 0 deletions test/builtins/test_builtins_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
"http://example..com/", # empty label
"http://?", # no host
)
PRIVATE_URLS = (
# "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online
"http://127.1.1.1/",
"http://10.1.1.1/",
"http://169.254.1.1/",
)


@pytest.fixture
Expand Down Expand Up @@ -76,6 +82,11 @@ def test_find_title_invalid(site):
assert url.find_title(site) is None


@pytest.mark.parametrize("site", PRIVATE_URLS)
def test_find_title_private(site):
assert url.find_title(site) is None


def test_check_callbacks(mockbot):
"""Test that check_callbacks works with both new & legacy URL callbacks."""
assert url.check_callbacks(mockbot, 'https://example.com/test')
Expand Down

0 comments on commit 065165a

Please sign in to comment.