|
| 1 | +""" |
| 2 | +Extended MetadataInjector that supports sdist (.tar.gz) and zip (.zip) formats. |
| 3 | +
|
| 4 | +This extends SimpleRepository's MetadataInjectorRepository to provide metadata extraction |
| 5 | +for package formats beyond wheels. |
| 6 | +
|
| 7 | +""" |
| 8 | + |
1 | 9 | from dataclasses import replace |
2 | 10 | import pathlib |
3 | 11 | import tarfile |
| 12 | +import typing |
4 | 13 | import zipfile |
5 | 14 |
|
6 | 15 | from simple_repository import model |
7 | 16 | from simple_repository.components.metadata_injector import MetadataInjectorRepository |
8 | 17 |
|
9 | 18 |
|
10 | | -def get_metadata_from_sdist(package_path: pathlib.Path) -> str: |
11 | | - archive = tarfile.TarFile.open(package_path) |
12 | | - names = archive.getnames() |
| 19 | +def _extract_pkg_info_from_archive( |
| 20 | + archive_names: typing.List[str], |
| 21 | + extract_func: typing.Callable[[str], typing.Optional[typing.IO[bytes]]], |
| 22 | + package_name: str, |
| 23 | +) -> str: |
| 24 | + """ |
| 25 | + Extract PKG-INFO metadata from an archive. |
| 26 | +
|
| 27 | + Args: |
| 28 | + archive_names: List of file names in the archive |
| 29 | + extract_func: Function to extract a file from the archive |
| 30 | + package_name: Name of the package for error messages |
| 31 | +
|
| 32 | + Returns: |
| 33 | + Metadata content as string |
13 | 34 |
|
14 | | - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] |
| 35 | + Raises: |
| 36 | + ValueError: If no valid metadata is found |
| 37 | + """ |
| 38 | + pkg_info_files = [x.split("/") for x in archive_names if "PKG-INFO" in x] |
| 39 | + # Sort by path length (descending) to prefer more specific/nested metadata files |
15 | 40 | ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) |
16 | 41 |
|
17 | 42 | for path in ordered_pkg_info: |
18 | 43 | candidate = "/".join(path) |
19 | | - f = archive.extractfile(candidate) |
| 44 | + f = extract_func(candidate) |
20 | 45 | if f is None: |
21 | 46 | continue |
22 | | - data = f.read().decode() |
23 | | - if "Metadata-Version" in data: |
24 | | - return data |
25 | | - raise ValueError(f"No metadata found in {package_path.name}") |
| 47 | + try: |
| 48 | + data = f.read().decode("utf-8") |
| 49 | + if "Metadata-Version" in data: |
| 50 | + return data |
| 51 | + except (UnicodeDecodeError, OSError): |
| 52 | + # Skip files that can't be decoded or read |
| 53 | + continue |
| 54 | + |
| 55 | + raise ValueError(f"No valid PKG-INFO metadata found in {package_name}") |
| 56 | + |
| 57 | + |
| 58 | +def get_metadata_from_sdist(package_path: pathlib.Path) -> str: |
| 59 | + """Extract metadata from a source distribution (.tar.gz file).""" |
| 60 | + with tarfile.TarFile.open(package_path) as archive: |
| 61 | + names = archive.getnames() |
| 62 | + |
| 63 | + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: |
| 64 | + return archive.extractfile(candidate) |
| 65 | + |
| 66 | + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) |
26 | 67 |
|
27 | 68 |
|
28 | 69 | def get_metadata_from_zip(package_path: pathlib.Path) -> str: |
29 | | - # Used by pyreadline. (a zipfile) |
| 70 | + """Extract metadata from a zip file (legacy format, used by packages like pyreadline).""" |
30 | 71 | with zipfile.ZipFile(package_path) as archive: |
31 | 72 | names = archive.namelist() |
32 | 73 |
|
33 | | - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] |
34 | | - ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) |
| 74 | + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: |
| 75 | + try: |
| 76 | + return archive.open(candidate, mode="r") |
| 77 | + except (KeyError, zipfile.BadZipFile): |
| 78 | + return None |
35 | 79 |
|
36 | | - for path in ordered_pkg_info: |
37 | | - candidate = "/".join(path) |
38 | | - f = archive.open(candidate, mode="r") |
39 | | - if f is None: |
40 | | - continue |
41 | | - data = f.read().decode() |
42 | | - if "Metadata-Version" in data: |
43 | | - return data |
44 | | - raise ValueError(f"No metadata found in {package_path.name}") |
| 80 | + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) |
45 | 81 |
|
46 | 82 |
|
47 | 83 | class MetadataInjector(MetadataInjectorRepository): |
| 84 | + """ |
| 85 | + Extended MetadataInjector that supports multiple package formats. |
| 86 | +
|
| 87 | + This class extends SimpleRepository's MetadataInjectorRepository to provide |
| 88 | + metadata extraction for: |
| 89 | + - Wheel files (.whl) - handled by parent class |
| 90 | + - Source distributions (.tar.gz) - contains PKG-INFO files |
| 91 | + - Zip files (.zip) - legacy format used by some packages |
| 92 | + """ |
| 93 | + |
| 94 | + # Map of supported file extensions to their extraction functions |
| 95 | + _EXTRACTORS: typing.Dict[ |
| 96 | + str, typing.Callable[["MetadataInjector", pathlib.Path], str] |
| 97 | + ] = { |
| 98 | + ".whl": lambda self, path: self._get_metadata_from_wheel(path), |
| 99 | + ".tar.gz": lambda self, path: get_metadata_from_sdist(path), |
| 100 | + ".zip": lambda self, path: get_metadata_from_zip(path), |
| 101 | + } |
| 102 | + |
48 | 103 | def _get_metadata_from_package(self, package_path: pathlib.Path) -> str: |
49 | | - if package_path.name.endswith(".whl"): |
50 | | - return self._get_metadata_from_wheel(package_path) |
51 | | - elif package_path.name.endswith(".tar.gz"): |
52 | | - return get_metadata_from_sdist(package_path) |
53 | | - elif package_path.name.endswith(".zip"): |
54 | | - return get_metadata_from_zip(package_path) |
55 | | - raise ValueError("Package provided is not a wheel") |
| 104 | + """Extract metadata from a package file based on its extension.""" |
| 105 | + package_name = package_path.name |
| 106 | + |
| 107 | + for extension, extractor in self._EXTRACTORS.items(): |
| 108 | + if package_name.endswith(extension): |
| 109 | + return extractor(self, package_path) |
| 110 | + |
| 111 | + # Provide more descriptive error message |
| 112 | + supported_formats = ", ".join(self._EXTRACTORS.keys()) |
| 113 | + raise ValueError( |
| 114 | + f"Unsupported package format: {package_name}. " |
| 115 | + f"Supported formats: {supported_formats}" |
| 116 | + ) |
56 | 117 |
|
57 | 118 | def _add_metadata_attribute( |
58 | 119 | self, |
59 | 120 | project_page: model.ProjectDetail, |
60 | 121 | ) -> model.ProjectDetail: |
61 | | - """Add the data-core-metadata to all the packages distributed as wheels""" |
| 122 | + """ |
| 123 | + Add the data-core-metadata attribute to all supported package files. |
| 124 | +
|
| 125 | + Unlike the parent class which only adds metadata attributes to wheel files, |
| 126 | + this implementation adds them to all files with URLs, enabling metadata |
| 127 | + requests for sdist and zip files as well. |
| 128 | + """ |
62 | 129 | files = [] |
63 | 130 | for file in project_page.files: |
64 | | - if file.url and not file.dist_info_metadata: |
| 131 | + matching_extension = file.filename.endswith(tuple(self._EXTRACTORS.keys())) |
| 132 | + if matching_extension and not file.dist_info_metadata: |
65 | 133 | file = replace(file, dist_info_metadata=True) |
66 | 134 | files.append(file) |
67 | | - project_page = replace(project_page, files=tuple(files)) |
68 | | - return project_page |
| 135 | + return replace(project_page, files=tuple(files)) |
0 commit comments