Skip to content

Commit

Permalink
Restrict amazonaws allowed certificate URLs.
Browse files Browse the repository at this point in the history
  • Loading branch information
pcraciunoiu committed May 21, 2023
1 parent c01fd52 commit b71b5f4
Showing 1 changed file with 50 additions and 32 deletions.
82 changes: 50 additions & 32 deletions django_ses/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import base64
import logging
import re
import warnings
from builtins import bytes

from django_ses.deprecation import RemovedInDjangoSES20Warning

from urllib.error import URLError
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError

from django.core.exceptions import ImproperlyConfigured

from django_ses import settings
from django_ses.deprecation import RemovedInDjangoSES20Warning

logger = logging.getLogger(__name__)

_CERT_CACHE = {}

SES_REGEX_CERT_URL = re.compile(
"(?i)^https://sns\.[a-z0-9\-]+\.amazonaws\.com(\.cn)?/SimpleNotificationService\-[a-z0-9]+\.pem$"
)


def clear_cert_cache():
"""Clear the certificate cache.
Expand Down Expand Up @@ -183,6 +187,17 @@ def _get_cert_url(self):
url_obj = urlparse(cert_url)
for trusted_domain in settings.EVENT_CERT_DOMAINS:
parts = trusted_domain.split(".")
if "amazonaws.com" in trusted_domain:
if not SES_REGEX_CERT_URL.match(cert_url):
if len(parts) < 4:
return None
else:
logger.warning('Possible security risk for: "%s"', cert_url)
logger.warning(
"It is strongly recommended to configure the full domain in EVENT_CERT_DOMAINS. "
"See v3.5.0 release notes for more details."
)

if url_obj.netloc.split(".")[-len(parts) :] == parts:
return cert_url

Expand All @@ -196,26 +211,28 @@ def _get_bytes_to_sign(self):

# Depending on the message type the fields to add to the message
# differ so we handle that here.
msg_type = self._data.get('Type')
if msg_type == 'Notification':
msg_type = self._data.get("Type")
if msg_type == "Notification":
fields_to_sign = [
'Message',
'MessageId',
'Subject',
'Timestamp',
'TopicArn',
'Type',
"Message",
"MessageId",
"Subject",
"Timestamp",
"TopicArn",
"Type",
]
elif (msg_type == 'SubscriptionConfirmation' or
msg_type == 'UnsubscribeConfirmation'):
elif (
msg_type == "SubscriptionConfirmation"
or msg_type == "UnsubscribeConfirmation"
):
fields_to_sign = [
'Message',
'MessageId',
'SubscribeURL',
'Timestamp',
'Token',
'TopicArn',
'Type',
"Message",
"MessageId",
"SubscribeURL",
"Timestamp",
"Token",
"TopicArn",
"Type",
]
else:
# Unrecognized type
Expand All @@ -237,14 +254,14 @@ def _get_bytes_to_sign(self):

def BounceMessageVerifier(*args, **kwargs):
warnings.warn(
'utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.',
"utils.BounceMessageVerifier is deprecated. It is renamed to EventMessageVerifier.",
RemovedInDjangoSES20Warning,
)

# parameter name is renamed from bounce_dict to notification.
if 'bounce_dict' in kwargs:
kwargs['notification'] = kwargs['bounce_dict']
del kwargs['bounce_dict']
if "bounce_dict" in kwargs:
kwargs["notification"] = kwargs["bounce_dict"]
del kwargs["bounce_dict"]

return EventMessageVerifier(*args, **kwargs)

Expand All @@ -262,31 +279,32 @@ def verify_bounce_message(msg):
Verify an SES/SNS bounce(event) notification message.
"""
warnings.warn(
'utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.',
"utils.verify_bounce_message is deprecated. It is renamed to verify_event_message.",
RemovedInDjangoSES20Warning,
)
return verify_event_message(msg)


def confirm_sns_subscription(notification):
logger.info(
'Received subscription confirmation: TopicArn: %s',
notification.get('TopicArn'),
"Received subscription confirmation: TopicArn: %s",
notification.get("TopicArn"),
extra={
'notification': notification,
"notification": notification,
},
)

# Get the subscribe url and hit the url to confirm the subscription.
subscribe_url = notification.get('SubscribeURL')
subscribe_url = notification.get("SubscribeURL")
try:
urlopen(subscribe_url).read()
except URLError as e:
# Some kind of error occurred when confirming the request.
logger.error(
'Could not confirm subscription: "%s"', e,
'Could not confirm subscription: "%s"',
e,
extra={
'notification': notification,
"notification": notification,
},
exc_info=True,
)

0 comments on commit b71b5f4

Please sign in to comment.