Skip to content

Commit

Permalink
Extracted function to a different file
Browse files Browse the repository at this point in the history
  • Loading branch information
acasajus committed Nov 27, 2024
1 parent bbc25eb commit 6ba0960
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 128 deletions.
57 changes: 56 additions & 1 deletion app/mailbox_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
email_can_be_used_as_mailbox,
send_email,
render,
get_email_domain_part,
)
from app.email_validation import is_valid_email
from app.log import LOG
from app.models import User, Mailbox, Job, MailboxActivation
from app.models import User, Mailbox, Job, MailboxActivation, Alias
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from app.utils import canonicalize_email


@dataclasses.dataclass
Expand Down Expand Up @@ -328,3 +330,56 @@ def perform_mailbox_email_change(mailbox_id: int) -> MailboxEmailChangeResult:
message="Invalid link",
message_category="error",
)


def __get_alias_mailbox_from_email(
email_address: str, alias: Alias
) -> Optional[Mailbox]:
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return mailbox

for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return mailbox
return None


def __get_alias_mailbox_from_email_or_canonical_email(
email_address: str, alias: Alias
) -> Optional[Mailbox]:
# We need to first check for the uncanonicalized version because we still have users in the db with the
# email non canonicalized. So if it matches the already existing one use that, otherwise check the canonical one
mbox = __get_alias_mailbox_from_email(email_address, alias)
if mbox is not None:
return mbox
canonical_email = canonicalize_email(email_address)
if canonical_email != email_address:
return __get_alias_mailbox_from_email(canonical_email, alias)
return None


def get_mailbox_for_reply_phase(
envelope_mail_from: str, header_mail_from: str, alias
) -> Optional[Mailbox]:
"""return the corresponding mailbox given the mail_from and alias
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""
mbox = __get_alias_mailbox_from_email_or_canonical_email(envelope_mail_from, alias)
if mbox is not None:
return mbox
if not header_mail_from:
return None
envelope_from_domain = get_email_domain_part(envelope_mail_from)
header_from_domain = get_email_domain_part(header_mail_from)
if envelope_from_domain != header_from_domain:
return None
# For services that use VERP sending (envelope from has encoded data to account for bounces)
# if the domain is the same in the header from as the envelope from we can use the header from
return __get_alias_mailbox_from_email_or_canonical_email(header_mail_from, alias)
53 changes: 3 additions & 50 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
from app.handler.unsubscribe_handler import UnsubscribeHandler
from app.log import LOG, set_message_id
from app.mail_sender import sl_sendmail
from app.mailbox_utils import get_mailbox_for_reply_phase
from app.message_utils import message_to_bytes
from app.models import (
Alias,
Expand All @@ -172,7 +173,7 @@
sign_data,
load_public_key_and_check,
)
from app.utils import sanitize_email, canonicalize_email
from app.utils import sanitize_email
from init_app import load_pgp_public_keys
from server import create_light_app

Expand Down Expand Up @@ -1021,7 +1022,7 @@ def handle_reply(envelope, msg: Message, rcpt_to: str) -> (bool, str):
return False, dmarc_delivery_status

# Anti-spoofing
mailbox = get_mailbox_from_mail_from(
mailbox = get_mailbox_for_reply_phase(
envelope.mail_from, get_header_unicode(msg[headers.FROM]), alias
)
if not mailbox:
Expand Down Expand Up @@ -1368,54 +1369,6 @@ def replace_original_message_id(alias: Alias, email_log: EmailLog, msg: Message)
msg[headers.REFERENCES] = " ".join(new_message_ids)


def get_mailbox_from_mail_from(
envelope_mail_from: str, header_mail_from: str, alias
) -> Optional[Mailbox]:
"""return the corresponding mailbox given the mail_from and alias
Usually the mail_from=mailbox.email but it can also be one of the authorized address
"""

def __check_email(email_address: str, alias: Alias) -> Optional[Mailbox]:
for mailbox in alias.mailboxes:
if mailbox.email == email_address:
return mailbox

for authorized_address in mailbox.authorized_addresses:
if authorized_address.email == email_address:
LOG.d(
"Found an authorized address for %s %s %s",
alias,
mailbox,
authorized_address,
)
return mailbox
return None

def __check_with_canonical(email_address: str, alias: Alias) -> Optional[Mailbox]:
# We need to first check for the uncanonicalized version because we still have users in the db with the
# email non canonicalized. So if it matches the already existing one use that, otherwise check the canonical one
mbox = __check_email(email_address, alias)
if mbox is not None:
return mbox
canonical_email = canonicalize_email(email_address)
if canonical_email != email_address:
return __check_email(canonical_email, alias)
return None

mbox = __check_with_canonical(envelope_mail_from, alias)
if mbox is not None:
return mbox
if not header_mail_from:
return None
envelope_from_domain = get_email_domain_part(envelope_mail_from)
header_from_domain = get_email_domain_part(header_mail_from)
if envelope_from_domain != header_from_domain:
return None
# For services that use VERP sending (envelope from has encoded data to account for bounces)
# if the domain is the same in the header from as the envelope from we can use the header from
return __check_with_canonical(header_mail_from, alias)


def handle_unknown_mailbox(
envelope, msg, reply_email: str, user: User, alias: Alias, contact: Contact
):
Expand Down
75 changes: 0 additions & 75 deletions tests/test_email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,96 +14,21 @@
from app.mail_sender import mail_sender
from app.models import (
Alias,
AuthorizedAddress,
IgnoredEmail,
EmailLog,
Notification,
VerpType,
Contact,
SentAlert,
Mailbox,
)
from app.utils import random_string, canonicalize_email
from email_handler import (
get_mailbox_from_mail_from,
should_ignore,
is_automatic_out_of_office,
)
from tests.utils import load_eml_file, create_new_user, random_email


def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()

mb = get_mailbox_from_mail_from(user.email, "", alias)
assert mb.email == user.email

mb = get_mailbox_from_mail_from("unauthorized@gmail.com", "", alias)
assert mb is None

# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="unauthorized@gmail.com",
commit=True,
)
mb = get_mailbox_from_mail_from("unauthorized@gmail.com", "", alias)
assert mb.email == user.email


def test_get_mailbox_from_mail_from_for_canonical_email(flask_client):
prefix = random_string(10)
email = f"{prefix}+subaddresxs@gmail.com"
canonical_email = canonicalize_email(email)
assert canonical_email != email

user = create_new_user()
mbox = Mailbox.create(
email=canonical_email, user_id=user.id, verified=True, flush=True
)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_from_mail_from(email, "", alias)
assert mb.email == canonical_email

mb = get_mailbox_from_mail_from(canonical_email, "", alias)
assert mb.email == canonical_email


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_from_mail_from(envelope_from, mail_from, alias)
assert mb.email == mail_from


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@other_{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_from_mail_from(envelope_from, mail_from, alias)
assert mb is None


def test_should_ignore(flask_client):
assert should_ignore("mail_from", []) is False

Expand Down
85 changes: 83 additions & 2 deletions tests/test_mailbox_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@
from app import mailbox_utils, config
from app.db import Session
from app.mail_sender import mail_sender
from app.mailbox_utils import MailboxEmailChangeError
from app.models import Mailbox, MailboxActivation, User, Job, UserAuditLog
from app.mailbox_utils import MailboxEmailChangeError, get_mailbox_for_reply_phase
from app.models import (
Mailbox,
MailboxActivation,
User,
Job,
UserAuditLog,
Alias,
AuthorizedAddress,
)
from app.user_audit_log_utils import UserAuditLogAction
from app.utils import random_string, canonicalize_email
from tests.utils import create_new_user, random_email


Expand Down Expand Up @@ -418,3 +427,75 @@ def test_perform_mailbox_email_change_success():
user_id=user.id, action=UserAuditLogAction.UpdateMailbox.value
).count()
assert audit_log_entries == 1


def test_get_mailbox_from_mail_from(flask_client):
user = create_new_user()
alias = Alias.create_new_random(user)
Session.commit()

mb = get_mailbox_for_reply_phase(user.email, "", alias)
assert mb.email == user.email

mb = get_mailbox_for_reply_phase("unauthorized@gmail.com", "", alias)
assert mb is None

# authorized address
AuthorizedAddress.create(
user_id=user.id,
mailbox_id=user.default_mailbox_id,
email="unauthorized@gmail.com",
commit=True,
)
mb = get_mailbox_for_reply_phase("unauthorized@gmail.com", "", alias)
assert mb.email == user.email


def test_get_mailbox_from_mail_from_for_canonical_email(flask_client):
prefix = random_string(10)
email = f"{prefix}+subaddresxs@gmail.com"
canonical_email = canonicalize_email(email)
assert canonical_email != email

user = create_new_user()
mbox = Mailbox.create(
email=canonical_email, user_id=user.id, verified=True, flush=True
)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(email, "", alias)
assert mb.email == canonical_email

mb = get_mailbox_for_reply_phase(canonical_email, "", alias)
assert mb.email == canonical_email


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb.email == mail_from


def test_get_mailbox_from_mail_from_coming_from_header_if_domain_is_not_aligned(
flask_client,
):
domain = f"{random_string(10)}.com"
envelope_from = f"envelope_verp@{domain}"
mail_from = f"mail_from@other_{domain}"
user = create_new_user()
mbox = Mailbox.create(email=mail_from, user_id=user.id, verified=True, flush=True)
alias = Alias.create(user_id=user.id, email=random_email(), mailbox_id=mbox.id)
Session.flush()

mb = get_mailbox_for_reply_phase(envelope_from, mail_from, alias)
assert mb is None

0 comments on commit 6ba0960

Please sign in to comment.