Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enforce 2fa for legacy uploaders #16365

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tests/common/db/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ class UserFactory(WarehouseFactory):
class Meta:
model = User

class Params:
# Shortcut to create a user with a verified primary email
with_verified_primary_email = factory.Trait(
email=factory.RelatedFactory(
"tests.common.db.accounts.EmailFactory",
factory_related_name="user",
primary=True,
verified=True,
)
)

username = factory.Faker("pystr", max_chars=12)
name = factory.Faker("word")
password = "!"
Expand Down
119 changes: 44 additions & 75 deletions tests/unit/email/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def test_doesnt_send_with_unverified(self, primary_email, address):
assert request.task.calls == []
assert task.delay.calls == []

def test_doesnt_send_within_reset_window(self, pyramid_request, pyramid_services):
def test_doesnt_send_within_repeat_window(self, pyramid_request, pyramid_services):
email_service = pretend.stub(
last_sent=pretend.call_recorder(
lambda to, subject: datetime.datetime.now()
Expand All @@ -226,6 +226,49 @@ def test_doesnt_send_within_reset_window(self, pyramid_request, pyramid_services
assert pyramid_request.task.calls == []
assert task.delay.calls == []

def test_sends_when_outside_repeat_window(self, db_request, pyramid_services):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this test case since the repeat_window logic wasn't exercised directly, only via another email template test.

email_service = pretend.stub(
last_sent=pretend.call_recorder(
lambda to, subject: datetime.datetime.now()
- datetime.timedelta(seconds=69)
)
)
pyramid_services.register_service(email_service, IEmailSender, None, name="")

task = pretend.stub(delay=pretend.call_recorder(lambda *a, **kw: None))
db_request.task = pretend.call_recorder(lambda x: task)

user = UserFactory.create(with_verified_primary_email=True)

msg = EmailMessage(subject="My Subject", body_text="My Body")

email._send_email_to_user(
db_request, user, msg, repeat_window=datetime.timedelta(seconds=42)
)

assert db_request.task.calls == [pretend.call(email.send_email)]
assert task.delay.calls == [
pretend.call(
f"{user.name} <{user.primary_email.email}>",
{
"sender": None,
"subject": "My Subject",
"body_text": "My Body",
"body_html": None,
},
{
"tag": "account:email:sent",
"user_id": user.id,
"additional": {
"from_": None,
"to": user.email,
"subject": "My Subject",
"redact_ip": False,
},
},
)
]

@pytest.mark.parametrize(
("username", "primary_email", "address", "expected"),
[
Expand Down Expand Up @@ -1557,80 +1600,6 @@ def test_password_reset_by_admin_email(
]


class Test2FAonUploadEmail:
def test_send_two_factor_not_yet_enabled_email(
self, pyramid_request, pyramid_config, monkeypatch
):
stub_user = pretend.stub(
id="id",
username="username",
name="",
email="email@example.com",
primary_email=pretend.stub(email="email@example.com", verified=True),
has_2fa=False,
)
subject_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/subject.txt"
)
subject_renderer.string_response = "Email Subject"
body_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/body.txt"
)
body_renderer.string_response = "Email Body"
html_renderer = pyramid_config.testing_add_renderer(
"email/two-factor-not-yet-enabled/body.html"
)
html_renderer.string_response = "Email HTML Body"

send_email = pretend.stub(
delay=pretend.call_recorder(lambda *args, **kwargs: None)
)
pyramid_request.task = pretend.call_recorder(lambda *args, **kwargs: send_email)
monkeypatch.setattr(email, "send_email", send_email)

pyramid_request.db = pretend.stub(
query=lambda a: pretend.stub(
filter=lambda *a: pretend.stub(
one=lambda: pretend.stub(user_id=stub_user.id)
)
),
)
pyramid_request.user = stub_user
pyramid_request.registry.settings = {"mail.sender": "noreply@example.com"}

result = email.send_two_factor_not_yet_enabled_email(
pyramid_request,
stub_user,
)

assert result == {"username": stub_user.username}
assert pyramid_request.task.calls == [pretend.call(send_email)]
assert send_email.delay.calls == [
pretend.call(
f"{stub_user.username} <{stub_user.email}>",
{
"sender": None,
"subject": "Email Subject",
"body_text": "Email Body",
"body_html": (
"<html>\n<head></head>\n"
"<body><p>Email HTML Body</p></body>\n</html>\n"
),
},
{
"tag": "account:email:sent",
"user_id": stub_user.id,
"additional": {
"from_": "noreply@example.com",
"to": stub_user.email,
"subject": "Email Subject",
"redact_ip": False,
},
},
)
]


class TestAccountDeletionEmail:
def test_account_deletion_email(
self, pyramid_request, pyramid_config, metrics, monkeypatch
Expand Down
30 changes: 17 additions & 13 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,9 @@ def test_fails_disallow_new_upload(self, pyramid_config, pyramid_request):
def test_fails_invalid_version(self, pyramid_config, pyramid_request, version):
pyramid_request.POST["protocol_version"] = version
pyramid_request.flags = pretend.stub(enabled=lambda *a: False)
pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")

user = pretend.stub(primary_email=pretend.stub(verified=True))
user = UserFactory.create(with_verified_primary_email=True)
pyramid_config.testing_securitypolicy(identity=user)
pyramid_request.user = user

Expand Down Expand Up @@ -4248,11 +4249,10 @@ def test_upload_succeeds_with_gpg_signature_field(

assert resp.status_code == 200

def test_upload_succeeds_without_two_factor(
def test_upload_fails_without_two_factor(
self, pyramid_config, db_request, metrics, project_service, monkeypatch
):
user = UserFactory.create(totp_secret=None)
EmailFactory.create(user=user)
user = UserFactory.create(totp_secret=None, with_verified_primary_email=True)

pyramid_config.testing_securitypolicy(identity=user)
db_request.user = user
Expand All @@ -4278,19 +4278,23 @@ def test_upload_succeeds_without_two_factor(
IProjectService: project_service,
}.get(svc)
db_request.user_agent = "warehouse-tests/6.6.6"
db_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")

send_email = pretend.call_recorder(lambda *a, **kw: None)
monkeypatch.setattr(legacy, "send_two_factor_not_yet_enabled_email", send_email)
with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = legacy.file_upload(db_request)
resp = excinfo.value

assert resp.status_code == 200
assert resp.body == (
b"Two factor authentication is not enabled for your account."
assert resp.status_code == 400
assert resp.status == (
(
"400 User {!r} does not have two-factor authentication enabled. "
"Please enable two-factor authentication before attempting to "
"upload to PyPI. See /the/help/url/ for more information."
).format(user.username)
)

assert send_email.calls == [
pretend.call(db_request, user),
assert db_request.help_url.calls == [
pretend.call(_anchor="two-factor-authentication")
]

@pytest.mark.parametrize(
Expand Down
9 changes: 0 additions & 9 deletions warehouse/email/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,6 @@ def send_account_recovery_initiated_email(
}


@_email(
"two-factor-not-yet-enabled",
allow_unverified=True,
repeat_window=datetime.timedelta(days=14),
)
def send_two_factor_not_yet_enabled_email(request, user):
return {"username": user.username}


@_email("account-deleted")
def send_account_deletion_email(request, user):
return {"username": user.username}
Expand Down
23 changes: 14 additions & 9 deletions warehouse/forklift/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@
from warehouse.authnz import Permissions
from warehouse.classifiers.models import Classifier
from warehouse.constants import MAX_FILESIZE, MAX_PROJECT_SIZE, ONE_GIB, ONE_MIB
from warehouse.email import (
send_api_token_used_in_trusted_publisher_project_email,
send_two_factor_not_yet_enabled_email,
)
from warehouse.email import send_api_token_used_in_trusted_publisher_project_email
from warehouse.events.tags import EventTag
from warehouse.forklift import metadata
from warehouse.forklift.forms import UploadForm, _filetype_extension_mapping
Expand Down Expand Up @@ -519,6 +516,19 @@ def file_upload(request):
project_help=request.help_url(_anchor="verified-email"),
),
) from None
# Ensure user has enabled 2FA before they can upload a file.
if not request.user.has_two_factor:
miketheman marked this conversation as resolved.
Show resolved Hide resolved
raise _exc_with_message(
HTTPBadRequest,
(
"User {!r} does not have two-factor authentication enabled. "
"Please enable two-factor authentication before attempting to "
"upload to PyPI. See {project_help} for more information."
).format(
request.user.username,
project_help=request.help_url(_anchor="two-factor-authentication"),
),
) from None

# Do some cleanup of the various form fields
for key in list(request.POST):
Expand Down Expand Up @@ -1257,11 +1267,6 @@ def file_upload(request):
},
)

# Check if the user has any 2FA methods enabled, and if not, email them.
if request.user and not request.user.has_two_factor:
warnings.append("Two factor authentication is not enabled for your account.")
send_two_factor_not_yet_enabled_email(request, request.user)

request.db.flush() # flush db now so server default values are populated for celery

# Push updates to BigQuery
Expand Down
Loading