Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitize Input (GSI-900) #14

Merged
merged 6 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pyproject_generation/pyproject_custom.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[project]
name = "ns"
version = "1.1.1"
version = "1.1.2"
description = "The Notification Service (NS) handles notification kafka events."
dependencies = [
"typer>=0.9.0",
"typer>=0.12",
"ghga-event-schemas>=3.0.0, <4",
"ghga-service-commons>=3.0.0",
"hexkit[akafka,mongodb]>=3.0.0",
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ We recommend using the provided Docker container.

A pre-build version is available at [docker hub](https://hub.docker.com/repository/docker/ghga/notification-service):
```bash
docker pull ghga/notification-service:1.1.1
docker pull ghga/notification-service:1.1.2
```

Or you can build the container yourself from the [`./Dockerfile`](./Dockerfile):
```bash
# Execute in the repo's root dir:
docker build -t ghga/notification-service:1.1.1 .
docker build -t ghga/notification-service:1.1.2 .
```

For production-ready deployment, we recommend using Kubernetes, however,
for simple use cases, you could execute the service using docker
on a single server:
```bash
# The entrypoint is preconfigured:
docker run -p 8080:8080 ghga/notification-service:1.1.1 --help
docker run -p 8080:8080 ghga/notification-service:1.1.2 --help
```

If you prefer not to use containers, you may install the service from source:
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "ns"
version = "1.1.1"
version = "1.1.2"
description = "The Notification Service (NS) handles notification kafka events."
dependencies = [
"typer>=0.9.0",
"typer>=0.12",
"ghga-event-schemas>=3.0.0, <4",
"ghga-service-commons>=3.0.0",
"hexkit[akafka,mongodb]>=3.0.0",
Expand Down
82 changes: 46 additions & 36 deletions src/ns/core/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
"""Contains the concrete implementation of a NotifierPort"""

import html
import logging
from contextlib import suppress
from email.message import EmailMessage
Expand Down Expand Up @@ -115,60 +116,69 @@ async def send_notification(self, *, notification: event_schemas.Notification):
notification_record.sent = True
await self._notification_record_dao.update(dto=notification_record)

def _construct_email(
self, *, notification: event_schemas.Notification
) -> EmailMessage:
"""Constructs an EmailMessage object from the contents of an email notification event"""
message = EmailMessage()
message["To"] = notification.recipient_email
message["Cc"] = notification.email_cc
message["Bcc"] = notification.email_bcc
message["Subject"] = notification.subject
message["From"] = self._config.from_address
def _build_email_subtype(
self, *, template_type: EmailTemplateType, values_dict: dict[str, str]
mephenor marked this conversation as resolved.
Show resolved Hide resolved
):
"""Builds an email message subtype (HTML or plaintext) from a template and
a dictionary of values.
"""
template_str = (
self._config.plaintext_email_template
if template_type == EmailTemplateType.PLAINTEXT
else self._config.html_email_template
)

payload_as_dict = notification.model_dump()
# Load the string as a python string template
template = Template(template_str)

# create plaintext html with template
plaintext_template = Template(self._config.plaintext_email_template)
# Try to substitute the values into the template
try:
plaintext_email = plaintext_template.substitute(payload_as_dict)
email_subtype = template.substitute(values_dict)
except KeyError as err:
template_var_error = self.VariableNotSuppliedError(variable=err.args[0])
log.critical(template_var_error, extra={"variable": err.args[0]})
raise template_var_error from err
except ValueError as err:
template_format_error = self.BadTemplateFormat(
template_type=EmailTemplateType.PLAINTEXT, problem=err.args[0]
template_type=template_type, problem=err.args[0]
)
log.critical(
template_format_error,
extra={
"template_type": EmailTemplateType.PLAINTEXT,
"problem": err.args[0],
},
extra={"template_type": template_type, "problem": err.args[0]},
)
raise template_format_error from err
return email_subtype

def _construct_email(
self, *, notification: event_schemas.Notification
) -> EmailMessage:
"""Constructs an EmailMessage object from the contents of an email notification event"""
message = EmailMessage()
message["To"] = notification.recipient_email
message["Cc"] = notification.email_cc
message["Bcc"] = notification.email_bcc
message["Subject"] = notification.subject
message["From"] = self._config.from_address

payload_as_dict = {**notification.model_dump()}

# create plaintext html with template
plaintext_email = self._build_email_subtype(
template_type=EmailTemplateType.PLAINTEXT, values_dict=payload_as_dict
)
message.set_content(plaintext_email)

# create html version of email, replacing variables of $var format
html_template = Template(self._config.html_email_template)
# Escape values exposed to the email in case they've been maliciously crafted
for k, v in payload_as_dict.items():
if isinstance(v, list):
payload_as_dict[k] = ", ".join([html.escape(_) for _ in v])
mephenor marked this conversation as resolved.
Show resolved Hide resolved
else:
payload_as_dict[k] = html.escape(v)

try:
html_email = html_template.substitute(payload_as_dict)
except KeyError as err:
template_var_error = self.VariableNotSuppliedError(variable=err.args[0])
log.critical(template_var_error, extra={"variable": err.args[0]})
raise template_var_error from err
except ValueError as err:
template_format_error = self.BadTemplateFormat(
template_type=EmailTemplateType.HTML, problem=err.args[0]
)
log.critical(
template_format_error,
extra={"template_type": EmailTemplateType.HTML, "problem": err.args[0]},
)
raise template_format_error from err
# create html version of email, replacing variables of $var format
html_email = self._build_email_subtype(
template_type=EmailTemplateType.HTML, values_dict=payload_as_dict
)

# add the html version to the EmailMessage object
message.add_alternative(html_email, subtype=EmailTemplateType.HTML)
Expand Down
50 changes: 50 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

"""Test basic event consumption"""

import html
from hashlib import sha256
from typing import cast

Expand Down Expand Up @@ -237,3 +238,52 @@ async def test_idempotence_and_transmission(joint_fixture: JointFixture):
# is bogus. If no ConnectionError is raised, that means the Notifier didn't try
# to send an email.
await joint_fixture.event_subscriber.run(forever=False)


async def test_html_escaping(joint_fixture: JointFixture):
"""Make sure dirty args are escaped properly in the HTML email and unchanged in the
plaintext email.
"""
# Cast notifier type
joint_fixture.notifier = cast(Notifier, joint_fixture.notifier)

original_body = "<script>alert('danger');</script>"
original_name = f"<p>{sample_notification["recipient_name"]}</p>"
injected_notification = {**sample_notification}
injected_notification["plaintext_body"] = original_body
injected_notification["recipient_name"] = original_name

# Precompute the escaped values and make sure they're not the same as the original
escaped_name = html.escape(original_name)
escaped_body = html.escape(original_body)
assert original_name != escaped_name
assert original_body != escaped_body

notification = make_notification(injected_notification)

msg = joint_fixture.notifier._construct_email(notification=notification)
assert msg is not None

plaintext_body = msg.get_body(preferencelist="plain")
assert plaintext_body is not None

plaintext_content = plaintext_body.get_content() # type: ignore
expected_plaintext = (
f"Dear {original_name},\n\n{original_body}\n"
+ "\nWarm regards,\n\nThe GHGA Team"
)
assert plaintext_content.strip() == expected_plaintext

html_body = msg.get_body(preferencelist="html")
assert html_body is not None

html_content = html_body.get_content() # type: ignore
assert html_content is not None

expected_html = (
'<!DOCTYPE html><html><head></head><body style="color: #00393f;'
+ f'padding: 12px;"><h2>Dear {escaped_name},</h2><p>{escaped_body}</p>'
+ "<p>Warm regards,</p><h3>The GHGA Team</h3>"
+ "</body></html>"
)
assert html_content.strip() == expected_html