Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PEP 503 rules to validate upload filename #10072

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lxml
mistune
msgpack
natsort
packaging>=15.2
packaging>=20.9
paginate>=0.5.2
paginate_sqlalchemy
passlib>=1.6.4
Expand Down
154 changes: 148 additions & 6 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from cgi import FieldStorage
from unittest import mock

import pkg_resources
import pretend
import pytest
import requests
Expand Down Expand Up @@ -669,6 +668,45 @@ def test_is_duplicate_true(self, pyramid_config, db_request):

assert legacy._is_duplicate_file(db_request.db, filename, hashes)

@pytest.mark.parametrize("file_name_prefix", ["foo_bar", "foo.bar"])
@pytest.mark.parametrize("project_name", ["foo_bar", "foo.bar"])
def test_is_duplicate_true_non_normalized_filename(
self, pyramid_config, db_request, file_name_prefix, project_name
):
pyramid_config.testing_securitypolicy(userid=1)

user = UserFactory.create()
EmailFactory.create(user=user)
project = ProjectFactory.create(name=project_name)
release = ReleaseFactory.create(project=project, version="1.0")
RoleFactory.create(user=user, project=project)

filename = "{}-{}.tar.gz".format(project_name, release.version)
file_content = io.BytesIO(_TAR_GZ_PKG_TESTDATA)
file_value = file_content.getvalue()

hashes = {
"sha256": hashlib.sha256(file_value).hexdigest(),
"md5": hashlib.md5(file_value).hexdigest(),
"blake2_256": hashlib.blake2b(file_value, digest_size=256 // 8).hexdigest(),
}
db_request.db.add(
File(
release=release,
filename=filename,
md5_digest=hashes["md5"],
sha256_digest=hashes["sha256"],
blake2_256_digest=hashes["blake2_256"],
path="source/{name[0]}/{name}/{filename}".format(
name=project.name, filename=filename
),
)
)

duplicate_filename = "{}-{}.tar.gz".format(file_name_prefix, release.version)

assert legacy._is_duplicate_file(db_request.db, duplicate_filename, hashes)

def test_is_duplicate_none(self, pyramid_config, db_request):
pyramid_config.testing_securitypolicy(userid=1)

Expand Down Expand Up @@ -2412,7 +2450,7 @@ def test_upload_fails_with_diff_filename_same_blake2(
"filetype": "sdist",
"md5_digest": hashlib.md5(file_content.getvalue()).hexdigest(),
"content": pretend.stub(
filename="{}-fake.tar.gz".format(project.name),
filename="{}-0.1.tar.gz".format(project.name),
file=file_content,
type="application/tar",
),
Expand Down Expand Up @@ -2446,7 +2484,14 @@ def test_upload_fails_with_diff_filename_same_blake2(
"400 File already exists. See /the/help/url/ for more information."
)

def test_upload_fails_with_wrong_filename(self, pyramid_config, db_request):
@pytest.mark.parametrize(
"filename",
[
"nope-{version}.tar.gz",
"nope-{version}-py3-none-any.whl",
],
)
def test_upload_fails_with_wrong_prefix(self, pyramid_config, db_request, filename):
pyramid_config.testing_securitypolicy(userid=1)

user = UserFactory.create()
Expand All @@ -2456,7 +2501,7 @@ def test_upload_fails_with_wrong_filename(self, pyramid_config, db_request):
release = ReleaseFactory.create(project=project, version="1.0")
RoleFactory.create(user=user, project=project)

filename = "nope-{}.tar.gz".format(release.version)
filename = filename.format(version=release.version)

db_request.POST = MultiDict(
{
Expand All @@ -2480,11 +2525,108 @@ def test_upload_fails_with_wrong_filename(self, pyramid_config, db_request):

assert resp.status_code == 400
assert resp.status == (
"400 Start filename for {!r} with {!r}.".format(
project.name, pkg_resources.safe_name(project.name).lower()
"400 Invalid filename: Start filename for project {!r} with {!r}.".format(
project.name, project.normalized_name.replace("-", "_")
)
)

def test_wheel_upload_passes_with_period_in_prefix(
self, pyramid_config, db_request, metrics, monkeypatch
):
"""
This tests existing behavior which allows periods in the project name,
which violates the wheel spec at [1], which requires that the filename
start with the PEP 503 normalization of the project name, followed by
replacing - with _.

[1] https://packaging.python.org/specifications/binary-distribution-format/
"""
pyramid_config.testing_securitypolicy(userid=1)

user = UserFactory.create()
db_request.user = user
EmailFactory.create(user=user)
project = ProjectFactory.create(name="some.project")
release = ReleaseFactory.create(project=project, version="1.0")
RoleFactory.create(user=user, project=project)

filename = "{project_name}-{version}-py3-none-any.whl".format(
version=release.version, project_name=project.name
)
monkeypatch.setattr(legacy, "_is_valid_dist_file", lambda *a, **kw: True)

db_request.find_service = pretend.call_recorder(
lambda svc, name=None, context=None: {
IFileStorage: pretend.stub(store=lambda *a, **kw: None),
IMetricsService: metrics,
}.get(svc)
)

db_request.user_agent = "warehouse-tests/6.6.6"
db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"version": release.version,
"filetype": "bdist_wheel",
"pyversion": "py3",
"md5_digest": _TAR_GZ_PKG_MD5,
"content": pretend.stub(
filename=filename,
file=io.BytesIO(_TAR_GZ_PKG_TESTDATA),
type="application/tar",
),
}
)

resp = legacy.file_upload(db_request)

assert resp.status_code == 200

@pytest.mark.parametrize(
"filename",
[
"nope-notaversion.tar.gz",
"nope-notaversion-py3-none-any.whl",
],
)
def test_upload_fails_with_invalid_version(
self, pyramid_config, db_request, filename
):
pyramid_config.testing_securitypolicy(userid=1)

user = UserFactory.create()
db_request.user = user
EmailFactory.create(user=user)
project = ProjectFactory.create()
release = ReleaseFactory.create(project=project, version="1.0")
RoleFactory.create(user=user, project=project)

filename = filename.format(version=release.version)

db_request.POST = MultiDict(
{
"metadata_version": "1.2",
"name": project.name,
"version": release.version,
"filetype": "sdist",
"md5_digest": "nope!",
"content": pretend.stub(
filename=filename,
file=io.BytesIO(b"a" * (legacy.MAX_FILESIZE + 1)),
type="application/tar",
),
}
)

with pytest.raises(HTTPBadRequest) as excinfo:
legacy.file_upload(db_request)

resp = excinfo.value

assert resp.status_code == 400
assert resp.status == "400 Invalid filename: Invalid version: 'notaversion'."

def test_upload_fails_with_invalid_extension(self, pyramid_config, db_request):
pyramid_config.testing_securitypolicy(userid=1)

Expand Down
41 changes: 34 additions & 7 deletions warehouse/forklift/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import packaging.specifiers
import packaging.utils
import packaging.version
import pkg_resources
import requests
import stdlib_list
import wtforms
Expand Down Expand Up @@ -737,6 +736,15 @@ def _is_valid_dist_file(filename, filetype):
return True


def _parse_filename(filename):
if filename.endswith(".whl"):
parse_func = packaging.utils.parse_wheel_filename
else:
parse_func = packaging.utils.parse_sdist_filename

return parse_func(filename)


def _is_duplicate_file(db_session, filename, hashes):
"""
Check to see if file already exists, and if it's content matches.
Expand All @@ -760,7 +768,8 @@ def _is_duplicate_file(db_session, filename, hashes):

if file_ is not None:
return (
file_.filename == filename
# This has the effect of canonicalizing the project name and version
_parse_filename(file_.filename) == _parse_filename(filename)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention here to ensure that I can't upload foo_bar-0.1.0.tar.gz if foo.bar-0.1.0.tar.gz already exists?

If so, I don't think this quite handles it. In the query above we filter on if the filenames OR hashes collide.

In the test for this change, we always collide hashes... and I'm pretty sure it would fail to catch the "collision" marked above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s also not quite easy to check for non-canonicalized file names from the database here, at least not without some schema shakeup. This is sort of a “best effort” to detect the user uploading the same file; duplicates are still possible, but they wouldn’t be that problematic and it’s not worth it to eliminate them.

and file_.sha256_digest == hashes["sha256"]
and file_.md5_digest == hashes["md5"]
and file_.blake2_256_digest == hashes["blake2_256"]
Expand Down Expand Up @@ -1229,13 +1238,31 @@ def file_upload(request):
"for more information.",
)

# Make sure that our filename matches the project that it is being uploaded
# to.
prefix = pkg_resources.safe_name(project.name).lower()
if not pkg_resources.safe_name(filename).lower().startswith(prefix):
try:
parsed_filename_parts = _parse_filename(filename)
except (
packaging.utils.InvalidSdistFilename,
packaging.utils.InvalidWheelFilename,
packaging.version.InvalidVersion,
) as e:
raise _exc_with_message(HTTPBadRequest, f"Invalid filename: {e}.")

# Make sure that the normalized version of the project name in the filename
# matches the normalized project name that it is being uploaded to.
#
# NB: This allows periods the project name, which violates the wheel spec
# at [1], which requires that the filename start with the PEP 503
# normalization of the project name, followed by replacing - with _.
#
# [1] https://packaging.python.org/specifications/binary-distribution-format/
if not parsed_filename_parts[0] == project.normalized_name:
escaped_normalized_project_name = project.normalized_name.replace("-", "_")
help_url = request.help_url(_anchor="invalid-file-name")
raise _exc_with_message(
HTTPBadRequest,
"Start filename for {!r} with {!r}.".format(project.name, prefix),
f"Invalid filename: Start filename for project {project.name!r} "
f"with {escaped_normalized_project_name!r}. "
f"See {help_url} for more information",
)

# Check the content type of what is being uploaded
Expand Down
Loading