Skip to content

Commit

Permalink
chore: enforce 2fa for legacy uploaders (pypi#16365)
Browse files Browse the repository at this point in the history
  • Loading branch information
miketheman committed Aug 2, 2024
1 parent 7bb7049 commit 54c1583
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 493 deletions.
11 changes: 11 additions & 0 deletions tests/common/db/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ class UserFactory(WarehouseFactory):
class Meta:
model = User

class Params:
# Shortcut to create a user with a verified primary email
with_verified_primary_email = factory.Trait(
email=factory.RelatedFactory(
"tests.common.db.accounts.EmailFactory",
factory_related_name="user",
primary=True,
verified=True,
)
)

username = factory.Faker("pystr", max_chars=12)
name = factory.Faker("word")
password = "!"
Expand Down
119 changes: 44 additions & 75 deletions tests/unit/email/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def test_doesnt_send_with_unverified(self, primary_email, address):
assert request.task.calls == []
assert task.delay.calls == []

def test_doesnt_send_within_reset_window(self, pyramid_request, pyramid_services):
def test_doesnt_send_within_repeat_window(self, pyramid_request, pyramid_services):
email_service = pretend.stub(
last_sent=pretend.call_recorder(
lambda to, subject: datetime.datetime.now()
Expand All @@ -226,6 +226,49 @@ def test_doesnt_send_within_reset_window(self, pyramid_request, pyramid_services
assert pyramid_request.task.calls == []
assert task.delay.calls == []

def test_sends_when_outside_repeat_window(self, db_request, pyramid_services):
email_service = pretend.stub(
last_sent=pretend.call_recorder(
lambda to, subject: datetime.datetime.now()
- datetime.timedelta(seconds=69)
)
)
pyramid_services.register_service(email_service, IEmailSender, None, name="")

task = pretend.stub(delay=pretend.call_recorder(lambda *a, **kw: None))
db_request.task = pretend.call_recorder(lambda x: task)

user = UserFactory.create(with_verified_primary_email=True)

msg = EmailMessage(subject="My Subject", body_text="My Body")

email._send_email_to_user(
db_request, user, msg, repeat_window=datetime.timedelta(seconds=42)
)

assert db_request.task.calls == [pretend.call(email.send_email)]
assert task.delay.calls == [
pretend.call(
f"{user.name} <{user.primary_email.email}>",
{
"sender": None,
"subject": "My Subject",
"body_text": "My Body",
"body_html": None,
},
{
"tag": "account:email:sent",
"user_id": user.id,
"additional": {
"from_": None,
"to": user.email,
"subject": "My Subject",
"redact_ip": False,
},
},
)
]

@pytest.mark.parametrize(
("username", "primary_email", "address", "expected"),
[
Expand Down Expand Up @@ -1557,80 +1600,6 @@ def test_password_reset_by_admin_email(
]


class Test2FAonUploadEmail:
def test_send_two_factor_not_yet_enabled_email(
self, pyramid_request, pyramid_config, monkeypatch
):
stub_user = pretend.stub(
id="id",
username="username",
name="",
email="email@example.com",
primary_email=pretend.stub(email="email@example.com", verified=True),
has_2fa=False,
)
subject_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/subject.txt"
)
subject_renderer.string_response = "Email Subject"
body_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/body.txt"
)
body_renderer.string_response = "Email Body"
html_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/body.html"
)
html_renderer.string_response = "Email HTML Body"

send_email = pretend.stub(
delay=pretend.call_recorder(lambda *args, **kwargs: None)
)
pyramid_request.task = pretend.call_recorder(lambda *args, **kwargs: send_email)
monkeypatch.setattr(email, "send_email", send_email)

pyramid_request.db = pretend.stub(
query=lambda a: pretend.stub(
filter=lambda *a: pretend.stub(
one=lambda: pretend.stub(user_id=stub_user.id)
)
),
)
pyramid_request.user = stub_user
pyramid_request.registry.settings = {"mail.sender": "noreply@example.com"}

result = email.send_two_factor_not_yet_enabled_email(
pyramid_request,
stub_user,
)

assert result == {"username": stub_user.username}
assert pyramid_request.task.calls == [pretend.call(send_email)]
assert send_email.delay.calls == [
pretend.call(
f"{stub_user.username} <{stub_user.email}>",
{
"sender": None,
"subject": "Email Subject",
"body_text": "Email Body",
"body_html": (
"<html>\n<head></head>\n"
"<body><p>Email HTML Body</p></body>\n</html>\n"
),
},
{
"tag": "account:email:sent",
"user_id": stub_user.id,
"additional": {
"from_": "noreply@example.com",
"to": stub_user.email,
"subject": "Email Subject",
"redact_ip": False,
},
},
)
]


class TestAccountDeletionEmail:
def test_account_deletion_email(
self, pyramid_request, pyramid_config, metrics, monkeypatch
Expand Down
30 changes: 17 additions & 13 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,9 @@ def test_fails_disallow_new_upload(self, pyramid_config, pyramid_request):
def test_fails_invalid_version(self, pyramid_config, pyramid_request, version):
pyramid_request.POST["protocol_version"] = version
pyramid_request.flags = pretend.stub(enabled=lambda *a: False)
pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")

user = pretend.stub(primary_email=pretend.stub(verified=True))
user = UserFactory.create(with_verified_primary_email=True)
pyramid_config.testing_securitypolicy(identity=user)
pyramid_request.user = user

Expand Down Expand Up @@ -4248,11 +4249,10 @@ def test_upload_succeeds_with_gpg_signature_field(

assert resp.status_code == 200

def test_upload_succeeds_without_two_factor(
def test_upload_fails_without_two_factor(
self, pyramid_config, db_request, metrics, project_service, monkeypatch
):
user = UserFactory.create(totp_secret=None)
EmailFactory.create(user=user)
user = UserFactory.create(totp_secret=None, with_verified_primary_email=True)

pyramid_config.testing_securitypolicy(identity=user)
db_request.user = user
Expand All @@ -4278,19 +4278,23 @@ def test_upload_succeeds_without_two_factor(
IProjectService: project_service,
}.get(svc)
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")

send_email = pretend.call_recorder(lambda *a, **kw: None)
monkeypatch.setattr(legacy, "send_two_factor_not_yet_enabled_email", send_email)
with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = legacy.file_upload(db_request)
resp = excinfo.value

assert resp.status_code == 200
assert resp.body == (
b"Two factor authentication is not enabled for your account."
assert resp.status_code == 400
assert resp.status == (
(
"400 User {!r} does not have two-factor authentication enabled. "
"Please enable two-factor authentication before attempting to "
"upload to PyPI. See /the/help/url/ for more information."
).format(user.username)
)

assert send_email.calls == [
pretend.call(db_request, user),
assert db_request.help_url.calls == [
pretend.call(_anchor="two-factor-authentication")
]

@pytest.mark.parametrize(
Expand Down
9 changes: 0 additions & 9 deletions warehouse/email/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,6 @@ def send_account_recovery_initiated_email(
}


@_email(
"two-factor-not-yet-enabled",
allow_unverified=True,
repeat_window=datetime.timedelta(days=14),
)
def send_two_factor_not_yet_enabled_email(request, user):
return {"username": user.username}


@_email("account-deleted")
def send_account_deletion_email(request, user):
return {"username": user.username}
Expand Down
23 changes: 14 additions & 9 deletions warehouse/forklift/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@
from warehouse.authnz import Permissions
from warehouse.classifiers.models import Classifier
from warehouse.constants import MAX_FILESIZE, MAX_PROJECT_SIZE, ONE_GIB, ONE_MIB
from warehouse.email import (
send_api_token_used_in_trusted_publisher_project_email,
send_two_factor_not_yet_enabled_email,
)
from warehouse.email import send_api_token_used_in_trusted_publisher_project_email
from warehouse.events.tags import EventTag
from warehouse.forklift import metadata
from warehouse.forklift.forms import UploadForm, _filetype_extension_mapping
Expand Down Expand Up @@ -519,6 +516,19 @@ def file_upload(request):
project_help=request.help_url(_anchor="verified-email"),
),
) from None
# Ensure user has enabled 2FA before they can upload a file.
if not request.user.has_two_factor:
raise _exc_with_message(
HTTPBadRequest,
(
"User {!r} does not have two-factor authentication enabled. "
"Please enable two-factor authentication before attempting to "
"upload to PyPI. See {project_help} for more information."
).format(
request.user.username,
project_help=request.help_url(_anchor="two-factor-authentication"),
),
) from None

# Do some cleanup of the various form fields
for key in list(request.POST):
Expand Down Expand Up @@ -1257,11 +1267,6 @@ def file_upload(request):
},
)

# Check if the user has any 2FA methods enabled, and if not, email them.
if request.user and not request.user.has_two_factor:
warnings.append("Two factor authentication is not enabled for your account.")
send_two_factor_not_yet_enabled_email(request, request.user)

request.db.flush() # flush db now so server default values are populated for celery

# Push updates to BigQuery
Expand Down
Loading

0 comments on commit 54c1583

Please sign in to comment.