From 63fa229e1c68674dbc9c9f641c6abeed09b8c301 Mon Sep 17 00:00:00 2001 From: Alexey Ovchinnikov Date: Mon, 23 Dec 2024 18:24:47 -0600 Subject: [PATCH] SPDX rework: additional code cleanup Adding test conditions and simplifying overall code structure. --- cachi2/core/models/output.py | 3 +- cachi2/core/models/property_semantics.py | 30 +- cachi2/core/models/sbom.py | 294 ++++++++++--------- cachi2/core/utils.py | 9 +- cachi2/interface/cli.py | 41 +-- tests/unit/models/test_property_semantics.py | 4 +- tests/unit/models/test_sbom.py | 2 + tests/unit/test_merge_spdx.py | 84 +++++- 8 files changed, 266 insertions(+), 201 deletions(-) diff --git a/cachi2/core/models/output.py b/cachi2/core/models/output.py index a555a243d..41b34d125 100644 --- a/cachi2/core/models/output.py +++ b/cachi2/core/models/output.py @@ -6,8 +6,7 @@ import pydantic from cachi2.core.errors import Cachi2Error -from cachi2.core.models.property_semantics import merge_component_properties -from cachi2.core.models.sbom import Component, Sbom +from cachi2.core.models.sbom import Component, Sbom, merge_component_properties from cachi2.core.models.validators import unique_sorted log = logging.getLogger(__name__) diff --git a/cachi2/core/models/property_semantics.py b/cachi2/core/models/property_semantics.py index 6261b9a77..71bab66c2 100644 --- a/cachi2/core/models/property_semantics.py +++ b/cachi2/core/models/property_semantics.py @@ -1,27 +1,27 @@ -import functools from dataclasses import dataclass, field -from itertools import groupby -from typing import TYPE_CHECKING, Iterable, Optional +from typing import TYPE_CHECKING, Iterable, Literal, Optional + +import pydantic if TYPE_CHECKING: from typing_extensions import Self, assert_never -from cachi2.core.models.sbom import Component, Property +PropertyName = Literal[ + "cachi2:bundler:package:binary", + "cachi2:found_by", + "cachi2:missing_hash:in_file", + "cachi2:pip:package:binary", + "cdx:npm:package:bundled", + "cdx:npm:package:development", +] -def merge_component_properties(components: Iterable[Component]) -> list[Component]: - """Sort and de-duplicate components while merging their `properties`.""" - components = sorted(components, key=Component.key) - grouped_components = groupby(components, key=Component.key) - def merge_component_group(component_group: Iterable[Component]) -> Component: - component_group = list(component_group) - prop_sets = (PropertySet.from_properties(c.properties) for c in component_group) - merged_prop_set = functools.reduce(PropertySet.merge, prop_sets) - component = component_group[0] - return component.model_copy(update={"properties": merged_prop_set.to_properties()}) +class Property(pydantic.BaseModel): + """A property inside an SBOM component.""" - return [merge_component_group(g) for _, g in grouped_components] + name: PropertyName + value: str @dataclass(frozen=True) diff --git a/cachi2/core/models/sbom.py b/cachi2/core/models/sbom.py index 3f3c929e0..0ab39e674 100644 --- a/cachi2/core/models/sbom.py +++ b/cachi2/core/models/sbom.py @@ -1,33 +1,23 @@ import datetime import hashlib import json +import logging from collections import defaultdict -from functools import cached_property +from functools import cached_property, partial, reduce +from itertools import chain, groupby from pathlib import Path from typing import Annotated, Any, Dict, Iterable, Literal, Optional, Union from urllib.parse import urlparse import pydantic from packageurl import PackageURL +from typing_extensions import Self +from cachi2.core.models.property_semantics import Property, PropertySet from cachi2.core.models.validators import unique_sorted -from cachi2.core.utils import first - -PropertyName = Literal[ - "cachi2:bundler:package:binary", - "cachi2:found_by", - "cachi2:missing_hash:in_file", - "cachi2:pip:package:binary", - "cdx:npm:package:bundled", - "cdx:npm:package:development", -] - - -class Property(pydantic.BaseModel): - """A property inside an SBOM component.""" +from cachi2.core.utils import first_for, partition_by - name: PropertyName - value: str +log = logging.getLogger(__name__) class ExternalReference(pydantic.BaseModel): @@ -96,6 +86,11 @@ class Metadata(pydantic.BaseModel): tools: list[Tool] = [Tool(vendor="red hat", name="cachi2")] +def spdx_now() -> str: + """Return a time stamp in SPDX-compliant format.""" + return datetime.datetime.now().isoformat()[:-7] + "Z" + + class Sbom(pydantic.BaseModel): """Software bill of materials in the CycloneDX format. @@ -111,11 +106,27 @@ class Sbom(pydantic.BaseModel): spec_version: str = pydantic.Field(alias="specVersion", default="1.4") version: int = 1 + def __add__(self, other: Union["Sbom", "SPDXSbom"]) -> "Sbom": + if isinstance(other, self.__class__): + return Sbom( + components=merge_component_properties( + chain.from_iterable(s.components for s in [self, other]) + ) + ) + else: + return self + other.to_cyclonedx() + @pydantic.field_validator("components") def _unique_components(cls, components: list[Component]) -> list[Component]: """Sort and de-duplicate components.""" return unique_sorted(components, by=lambda component: component.key()) + def to_cyclonedx(self) -> Self: + """Return self, self is already the right type of Sbom.""" + # This is a short-cut, but since it is unlikely that we would ever add more Sbom types + # it is acceptable. If, however this ever happens a proper base class will be needed. + return self + def to_spdx(self, doc_namespace: str) -> "SPDXSbom": """Convert a CycloneDX SBOM to an SPDX SBOM. @@ -123,87 +134,70 @@ def to_spdx(self, doc_namespace: str) -> "SPDXSbom": doc_namespace: SPDX document namespace. Namespace is URI of indicating """ - packages = [] - relationships = [] - - packages.append( - SPDXPackage( - name="", - versionInfo="", - SPDXID="SPDXRef-DocumentRoot-File-", - ) - ) - - for component in self.components: - annotations = [] - for prop in component.properties: - annotations.append( - SPDXPackageAnnotation( - annotator="Tool: cachi2:jsonencoded", - annotationDate=datetime.datetime.now().isoformat()[:-7] + "Z", - annotationType="OTHER", - comment=json.dumps( - {"name": f"{prop.name}", "value": f"{prop.value}"}, - ), - ) - ) - package_hash = SPDXPackage._calculate_package_hash_from_dict( - { - "name": component.name, - "version": component.version, - "purl": component.purl, - } - ) - packages.append( - SPDXPackage( - SPDXID=f"SPDXRef-Package-{component.name}-{component.version}-{package_hash}", - name=component.name, - versionInfo=component.version, - externalRefs=[ - dict( - referenceCategory="PACKAGE-MANAGER", - referenceLocator=component.purl, - referenceType="purl", - ) - ], - annotations=annotations, - ) - ) + def create_document_root() -> SPDXPackage: + return SPDXPackage(name="", versionInfo="", SPDXID="SPDXRef-DocumentRoot-File-") - relationships.append( - SPDXRelation( + def create_root_relationship() -> SPDXRelation: + return SPDXRelation( spdxElementId="SPDXRef-DOCUMENT", comment="", relatedSpdxElement="SPDXRef-DocumentRoot-File-", relationshipType="DESCRIBES", ) - ) - for package in packages: - if package.SPDXID == "SPDXRef-DocumentRoot-File-": - continue - relationships.append( - SPDXRelation( - spdxElementId="SPDXRef-DocumentRoot-File-", - comment="", - relatedSpdxElement=package.SPDXID, - relationshipType="CONTAINS", + def link_to_root(packages: Union[list[SPDXPackage], list[SPDXFile]]) -> list[SPDXRelation]: + relationships, root_id, rtype = [], "SPDXRef-DocumentRoot-File-", "CONTAINS" + pRel = partial(SPDXRelation, spdxElementId=root_id, comment="", relationshipType=rtype) + for package in packages: + if package.SPDXID == "SPDXRef-DocumentRoot-File-": + continue + relationships.append(pRel(relatedSpdxElement=package.SPDXID)) + return relationships + + def libs_to_packages(libraries: list[Component]) -> list[SPDXPackage]: + packages, annottr, now = [], "Tool: cachi2:jsonencoded", spdx_now() + args = dict(annotator=annottr, annotationDate=now, annotationType="OTHER") + pAnnotation = partial(SPDXPackageAnnotation, **args) + + # noqa for trivial helpers. + mkcomm = lambda p: json.dumps(dict(name=f"{p.name}", value=f"{p.value}")) # noqa: E731 + hashdict = lambda c: dict(name=c.name, version=c.version, purl=c.purl) # noqa: E731 + erefbase = dict(referenceCategory="PACKAGE-MANAGER", referenceType="purl") + erefdict = lambda c: dict(referenceLocator=c.purl, **erefbase) # noqa: E731 + + for component in libraries: + package_hash = SPDXPackage._calculate_package_hash_from_dict(hashdict(component)) + packages.append( + SPDXPackage( + SPDXID=f"SPDXRef-Package-{component.name}-{component.version}-{package_hash}", + name=component.name, + versionInfo=component.version, + externalRefs=[erefdict(component)], + annotations=[pAnnotation(comment=mkcomm(p)) for p in component.properties], + ) ) - ) + return packages + + def files_to_packages(cydxfiles: list[Component]) -> list[SPDXFile]: + return [SPDXFile(SPDXID="SPDXRef-File-{f.name}", fileName=f.name) for f in cydxfiles] + + # Main function body. + cydxfiles, libraries = partition_by(lambda c: c.type == "library", self.components) + # mypy is upset by patrition_by being broadly typed. + packages = [create_document_root()] + libs_to_packages(libraries) # type: ignore + files = files_to_packages(cydxfiles) # type: ignore + relationships = [create_root_relationship()] + link_to_root(packages) + link_to_root(files) + # noqa for a trivial helper. + creator = lambda tool: [f"Tool: {tool.name}", f"Organization: {tool.vendor}"] # noqa: E731 return SPDXSbom( packages=packages, relationships=relationships, + files=files, documentNamespace=doc_namespace, creationInfo=SPDXCreationInfo( - creators=sum( - [ - [f"Tool: {tool.name}", f"Organization: {tool.vendor}"] - for tool in self.metadata.tools - ], - [], - ), - created=datetime.datetime.now().isoformat()[:-7] + "Z", + creators=sum([creator(tool) for tool in self.metadata.tools], []), + created=spdx_now(), ), ) @@ -353,6 +347,14 @@ def __hash__(self) -> int: ) +def _extract_purls(from_refs: list[SPDXPackageExternalRefType]) -> list[str]: + return [ref.referenceLocator for ref in from_refs if ref.referenceType == "purl"] + + +def _parse_purls(purls: list[str]) -> list[PackageURL]: + return [PackageURL.from_string(purl) for purl in purls if purl] + + class SPDXPackage(pydantic.BaseModel): """SPDX Package. @@ -389,8 +391,7 @@ def _purls_validation( cls, refs: list[SPDXPackageExternalRefType] ) -> list[SPDXPackageExternalRefType]: """Validate that SPDXPackage includes only one purl with the same type, name, version.""" - purls = [ref.referenceLocator for ref in refs if ref.referenceType == "purl"] - parsed_purls = [PackageURL.from_string(purl) for purl in purls if purl] + parsed_purls = _parse_purls(_extract_purls(from_refs=refs)) unique_purls_parts = set([(p.type, p.name, p.version) for p in parsed_purls]) if len(unique_purls_parts) > 1: raise ValueError( @@ -404,7 +405,7 @@ def from_package_dict(cls, package: dict[str, Any]) -> "SPDXPackage": """Create a SPDXPackage from a Cachi2 package dictionary.""" external_refs = package.get("externalRefs", []) annotations = [SPDXPackageAnnotation(**an) for an in package.get("annotations", [])] - if not package.get("SPDXID"): + if package.get("SPDXID") is None: purls = sorted( [ ref["referenceLocator"] @@ -463,6 +464,16 @@ def __hash__(self) -> int: ) +def _unique(elements: Iterable) -> list: + out, seen = [], set() + for el in elements: + if el in seen: + continue + seen.add(el) + out.append(el) + return out + + class SPDXSbom(pydantic.BaseModel): """Software bill of materials in the SPDX format. @@ -507,33 +518,26 @@ def deduplicate_spdx_packages(items: Iterable[SPDXPackage]) -> list[SPDXPackage] merge external references of all the packages into one package. """ # NOTE: keeping this implementation mostly intact for historical reasons. - unique_items = {} + unique_items: dict[tuple, SPDXPackage] = {} for item in items: - purls = [ - ref.referenceLocator for ref in item.externalRefs if ref.referenceType == "purl" - ] + purls = _extract_purls(item.externalRefs) if purls: - keys = [PackageURL.from_string(purl) for purl in purls if purl] - # name can exist in mutiple namespaces, e.g. rand exists in both math and + # name can exist in multiple namespaces, e.g. rand exists in both math and # crypto, and must be distinguished between. - p = keys[0] - package_name = p.name - if p.namespace is not None: - package_name = p.namespace + "/" + p.name - purl_tnv = (p.type, package_name, p.version) + purl = _parse_purls(purls)[0] + package_name = purl.name + if purl.namespace is not None: + package_name = purl.namespace + "/" + purl.name + purl_tnv = (purl.type, package_name, purl.version) else: + log.warning(f"No purls found for {item}.") purl_tnv = ("", item.name, item.versionInfo or "") - if purl_tnv not in unique_items: - # TODO: model_copy? - unique_items[purl_tnv] = SPDXPackage( - SPDXID=item.SPDXID, name=item.name, versionInfo=item.versionInfo - ) - unique_items[purl_tnv].externalRefs = item.externalRefs[:] - unique_items[purl_tnv].annotations = item.annotations[:] - else: + if purl_tnv in unique_items: unique_items[purl_tnv].externalRefs.extend(item.externalRefs) unique_items[purl_tnv].annotations.extend(item.annotations) + else: + unique_items[purl_tnv] = item.model_copy(deep=True) for item in unique_items.values(): item.externalRefs = sorted( @@ -562,14 +566,14 @@ def root_id(self) -> str: for rel in self.relationships: direct_relationships[rel.spdxElementId].append(rel.relatedSpdxElement) inverse_relationships[rel.relatedSpdxElement] = rel.spdxElementId - # noqa because the name is bound to make local intent clearer and first() call easier to - # follow. + # noqa because the name is bound to make local intent clearer and + # first_for() call easier to follow. unidirectionally_related_package = ( lambda p: inverse_relationships.get(p) == self.SPDXID # noqa: E731 ) # Note: defaulting to top-level SPDXID is inherited from the original implementation. # It is unclear if it is really needed, but is left around to match the precedent. - root_id = first(unidirectionally_related_package, direct_relationships, self.SPDXID) + root_id = first_for(unidirectionally_related_package, direct_relationships, self.SPDXID) return root_id # NOTE: having this as cached will cause trouble when sequentially @@ -603,23 +607,19 @@ def retarget_and_prune_relationships( def __add__(self, other: Union["SPDXSbom", Sbom]) -> "SPDXSbom": if isinstance(other, self.__class__): - # Packages are not going to be modified so it is OK to just pass references around. + # Packages are not going to be modified so it is OK to just pass + # references around. merged_packages = self.packages + other.non_root_packages # Relationships, on the other hand, are amended, so new - # relationships will be constructed. - # Further, identical relationships should be dropped. - # Deduplication based on building a set is considered safe because all - # fields of all elements are used to compute a hash. - # Just using set won't work satisfactory since that would shuffle relationships. - merged_relationships, seen_relationships = [], set() + # relationships will be constructed. Further, identical + # relationships should be dropped. Deduplication based on building + # a set is considered safe because all fields of all elements are + # used to compute a hash. processed_other = self.retarget_and_prune_relationships(from_sbom=other, to_sbom=self) - for r in self.relationships + processed_other: - if r not in seen_relationships: - seen_relationships.add(r) - merged_relationships.append(r) + merged_relationships = _unique(self.relationships + processed_other) # The same as packages: files are not modified, so keeping them as is. # Identical file entries should be skipped. - merged_files = list(set(self.files + other.files)) + merged_files = _unique(self.files + other.files) res = self.model_copy( update={ # At the moment of writing pydantic does not deem it necessary to @@ -633,7 +633,16 @@ def __add__(self, other: Union["SPDXSbom", Sbom]) -> "SPDXSbom": return res elif isinstance(other, Sbom): return self + other.to_spdx(doc_namespace="NOASSERTION") - raise Exception("Something smart goes here") + else: + self_class = self.__class__.__name__ + other_class = other.__class__.__name__ + raise ValueError(f"Cannot merge {other_class} to {self_class}") + + def to_spdx(self, *a: Any, **k: Any) -> Self: + """Return self, ignore arguments, self is already a SPDX document.""" + # This is a short-cut, but since it is unlikely that we would ever add more Sbom types + # it is acceptable. If, however this ever happens a proper base class will be needed. + return self def to_cyclonedx(self) -> Sbom: """Convert a SPDX SBOM to a CycloneDX SBOM.""" @@ -647,35 +656,21 @@ def to_cyclonedx(self) -> Sbom: ) for an in package.annotations ] - purls = [ - ref.referenceLocator for ref in package.externalRefs if ref.referenceType == "purl" - ] + pComponent = partial( + Component, name=package.name, version=package.versionInfo, properties=properties + ) + purls = _extract_purls(package.externalRefs) # cyclonedx doesn't support multiple purls, therefore # new component is created for each purl - for purl in purls: - components.append( - Component( - name=package.name, - version=package.versionInfo, - purl=purl, - properties=properties, - ) - ) + components += [pComponent(purl=purl) for purl in purls] # if there's no purl and no package name or version, it's just wrapping element for # spdx package which is one layer bellow SPDXDocument in relationships if not any((purls, package.name, package.versionInfo)): continue # if there's no purl, add it as single component elif not purls: - components.append( - Component( - name=package.name, - version=package.versionInfo, - properties=properties, - purl="", - ) - ) + components.append(pComponent(purl="")) tools = [] name, vendor = None, None # Following approach is used as position of "Organization" and "Tool" is not @@ -695,5 +690,20 @@ def to_cyclonedx(self) -> Sbom: ) +def merge_component_properties(components: Iterable[Component]) -> list[Component]: + """Sort and de-duplicate components while merging their `properties`.""" + components = sorted(components, key=Component.key) + grouped_components = groupby(components, key=Component.key) + + def merge_component_group(component_group: Iterable[Component]) -> Component: + component_group = list(component_group) + prop_sets = (PropertySet.from_properties(c.properties) for c in component_group) + merged_prop_set = reduce(PropertySet.merge, prop_sets) + component = component_group[0] + return component.model_copy(update={"properties": merged_prop_set.to_properties()}) + + return [merge_component_group(g) for _, g in grouped_components] + + # References # [1] https://github.com/pydantic/pydantic/blob/6fa92d139a297a26725dec0a7f9b0cce912d6a7f/pydantic/main.py#L383 diff --git a/cachi2/core/utils.py b/cachi2/core/utils.py index ad6415fd8..49efdf1a1 100644 --- a/cachi2/core/utils.py +++ b/cachi2/core/utils.py @@ -7,6 +7,7 @@ import subprocess import sys from functools import cache +from itertools import filterfalse, tee from pathlib import Path from typing import Any, Callable, Iterable, Iterator, Optional, Sequence @@ -197,6 +198,12 @@ def get_cache_dir() -> Path: return cache_dir.joinpath("cachi2") -def first(predicate: Callable, iterable: Iterable, fallback: Any) -> Any: +def first_for(predicate: Callable, iterable: Iterable, fallback: Any) -> Any: """Return the first match of predicate in iterable or fallback value.""" return next((x for x in iterable if predicate(x)), fallback) + + +def partition_by(predicate: Callable, iterable: Iterable) -> tuple[Iterable, Iterable]: + """Partition iterable in two by predicate.""" + i1, i2 = tee(iterable) + return filterfalse(predicate, i1), filter(predicate, i2) diff --git a/cachi2/interface/cli.py b/cachi2/interface/cli.py index fc3899f3a..a7c2f7290 100644 --- a/cachi2/interface/cli.py +++ b/cachi2/interface/cli.py @@ -1,4 +1,3 @@ -import datetime import enum import functools import importlib.metadata @@ -6,7 +5,6 @@ import logging import shutil import sys -from itertools import chain from pathlib import Path from typing import Any, Callable, List, Optional, Union @@ -18,8 +16,7 @@ from cachi2.core.extras.envfile import EnvFormat, generate_envfile from cachi2.core.models.input import Flag, PackageInput, Request, parse_user_input from cachi2.core.models.output import BuildConfig -from cachi2.core.models.property_semantics import merge_component_properties -from cachi2.core.models.sbom import Sbom, SPDXSbom +from cachi2.core.models.sbom import Sbom, SPDXSbom, spdx_now from cachi2.core.resolver import inject_files_post, resolve_packages, supported_package_managers from cachi2.core.rooted_path import RootedPath from cachi2.interface.logging import LogLevel, setup_logging @@ -427,9 +424,10 @@ def merge_sboms( ) -> None: """Merge two or more SBOMs into one. - The command works with Cachi2-generated SBOMs only. You might want to run + The command works with Cachi2-generated SBOMs and with a supported subset of + SPDX SBOMs. You might want to run - cachi2 fetch-deps + cachi2 fetch-deps first to produce SBOMs to merge. """ @@ -446,33 +444,16 @@ def merge_sboms( sboms_to_merge.append(SPDXSbom(**sbom_dict)) except pydantic.ValidationError: raise UnexpectedFormat(f"{sbom_file} does not appear to be a valid Cachi2 SBOM.") - + # start_sbom will later coerce every other SBOM to its type. + start_sbom: Union[Sbom, SPDXSbom] # this visual noise is demanded by mypy. if sbom_type == SBOMFormat.cyclonedx: - cyclonedx_sboms_to_merge = [] - for _sbom in sboms_to_merge: - if not isinstance(_sbom, Sbom): - cyclonedx_sboms_to_merge.append(_sbom.to_cyclonedx()) - else: - cyclonedx_sboms_to_merge.append(_sbom) - # TODO: merging SBOMs should be done uniformly via "+" - sbom: Union[Sbom, SPDXSbom] = Sbom( - components=merge_component_properties( - chain.from_iterable(s.components for s in cyclonedx_sboms_to_merge) - ) - ) + start_sbom = sboms_to_merge[0].to_cyclonedx() else: - spdx_sboms_to_merge = [] - for _sbom in sboms_to_merge: - if not isinstance(_sbom, SPDXSbom): - spdx_sboms_to_merge.append(_sbom.to_spdx(doc_namespace="NOASSERTION")) - else: - spdx_sboms_to_merge.append(_sbom) - - sbom = sum(spdx_sboms_to_merge, start=spdx_sboms_to_merge[0]) + start_sbom = sboms_to_merge[0].to_spdx(doc_namespace="NOASSERTION") if sbom_name is not None: - sbom.name = sbom_name - sbom.creationInfo.created = datetime.datetime.now().isoformat()[:-7] + "Z" - + start_sbom.name = sbom_name + start_sbom.creationInfo.created = spdx_now() + sbom = sum(sboms_to_merge[1:], start=start_sbom) sbom_json = sbom.model_dump_json(indent=2, by_alias=True, exclude_none=True) if output_sbom_file_name is not None: diff --git a/tests/unit/models/test_property_semantics.py b/tests/unit/models/test_property_semantics.py index 29c87b2a9..98bfb0b8e 100644 --- a/tests/unit/models/test_property_semantics.py +++ b/tests/unit/models/test_property_semantics.py @@ -1,7 +1,7 @@ import pytest -from cachi2.core.models.property_semantics import PropertySet, merge_component_properties -from cachi2.core.models.sbom import Component, Property +from cachi2.core.models.property_semantics import PropertySet +from cachi2.core.models.sbom import Component, Property, merge_component_properties @pytest.mark.parametrize( diff --git a/tests/unit/models/test_sbom.py b/tests/unit/models/test_sbom.py index dc049eb53..bd1000b8e 100644 --- a/tests/unit/models/test_sbom.py +++ b/tests/unit/models/test_sbom.py @@ -1019,7 +1019,9 @@ def test_spdx_sbom_can_be_converted_to_cyclonedx_and_back_without_loosing_any_da }, ], ) + spdx_sbom = sbom.to_cyclonedx().to_spdx("NOASSERTION") + assert json.dumps(sbom.model_dump(), sort_keys=True, indent=4) == json.dumps( spdx_sbom.model_dump(), sort_keys=True, indent=4 ) diff --git a/tests/unit/test_merge_spdx.py b/tests/unit/test_merge_spdx.py index 0f80f98f6..7abe0342b 100644 --- a/tests/unit/test_merge_spdx.py +++ b/tests/unit/test_merge_spdx.py @@ -28,7 +28,7 @@ def _assert_merging_sboms_produces_correct_number_of_packages( # Ignoring mypy because it is cheaper to rebind the name in a local utility. sources = set(sources) # type: ignore unqie_packages_across_all_sources = set(sum([s.packages for s in sources], [])) - # Drop one root package per source, add 1 to account for one eventuial root: + # Drop one root package per source, add 1 to account for one eventual root: expected_num_of_packages = len(unqie_packages_across_all_sources) - len(sources) + 1 actual_num_of_packages = len(merged_sbom.packages) difference = expected_num_of_packages - actual_num_of_packages @@ -148,6 +148,16 @@ def _assert_sbom_is_well_formed(sbom: SPDXSbom) -> None: _assert_no_unrelated_files(sbom) +def _assert_no_relationship_is_duplicated(sbom: SPDXSbom) -> None: + have_relationships = len(sbom.relationships) + should_have_relationships = len(set(sbom.relationships)) + fail_msg = ( + "Relationships duplication detected: have " + f"{have_relationships - should_have_relationships} relationships more than expected." + ) + assert have_relationships == should_have_relationships, fail_msg + + # Data for this test was generated with syft like this: # For a directory: # $ ./syft dir:experiments/ -o spdx-json > experiments.json @@ -225,6 +235,25 @@ def test_merging_two_spdx_sboms_works_in_general_independent_of_order( ), id="three unique SBOMs", ), + ], +) +def test_merging_several_spdx_sboms_works_in_general_independent_of_order( + sboms_to_merge: list[Any], # 'Any' is used to prevent mypy from having a fit over re-binding +) -> None: + sboms_to_merge = [SPDXSbom.from_file(Path(s)) for s in sboms_to_merge] + + merged_sbom = reduce(add, sboms_to_merge) + + _assert_all_relationships_are_within_the_document(for_sbom=merged_sbom) + _assert_merging_sboms_produces_correct_number_of_packages(merged_sbom, *sboms_to_merge) + _assert_root_was_inherited_from_left_sbom(merged_sbom, sboms_to_merge[0]) + _assert_there_is_only_one_root(merged_sbom) + _assert_sbom_is_well_formed(merged_sbom) + + +@pytest.mark.parametrize( + "sboms_to_merge", + [ pytest.param( ( "./tests/unit/data/alpine.pretty.json", @@ -232,28 +261,63 @@ def test_merging_two_spdx_sboms_works_in_general_independent_of_order( "./tests/unit/data/something.more.simple.0.100.0.spdx.pretty.json", "./tests/unit/data/something.simple0.100.0.spdx.pretty.json", ), - # The same SBOM is attempted to be merged in -- should be a separate test id="three unique SBOMs and a duplicate", ), + pytest.param( + ( + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + ), + id="merging with self", + ), ], ) -def test_merging_several_spdx_sboms_works_in_general_independent_of_order( +def test_merging_same_spdx_sbom_multiple_times_does_not_increase_the_number_of_packages( + sboms_to_merge: list[Any], # 'Any' is used to prevent mypy from having a fit over re-binding +) -> None: + sboms_to_merge = [SPDXSbom.from_file(Path(s)) for s in sboms_to_merge] + + merged_sbom = reduce(add, sboms_to_merge) + + _assert_all_relationships_are_within_the_document(for_sbom=merged_sbom) + _assert_merging_sboms_produces_correct_number_of_packages(merged_sbom, *sboms_to_merge) + _assert_root_was_inherited_from_left_sbom(merged_sbom, sboms_to_merge[0]) + _assert_there_is_only_one_root(merged_sbom) + _assert_sbom_is_well_formed(merged_sbom) + + +@pytest.mark.parametrize( + "sboms_to_merge", + [ + pytest.param( + ( + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + "./tests/unit/data/alpine.pretty.json", + ), + id="merging with self", + ), + ], +) +def test_merging_spdx_on_self_does_not_modify_the_sbom( sboms_to_merge: list[Any], # 'Any' is used to prevent mypy from having a fit over re-binding ) -> None: sboms_to_merge = [SPDXSbom.from_file(Path(s)) for s in sboms_to_merge] merged_sbom = reduce(add, sboms_to_merge) - # merged_sbom = sum(sboms_to_merge[1:], sboms_to_merge[0]) ? _assert_all_relationships_are_within_the_document(for_sbom=merged_sbom) - # TODO: assert_no_relation_goes_missing(merged_sbom, *sboms) + _assert_no_relationship_is_duplicated(merged_sbom) _assert_merging_sboms_produces_correct_number_of_packages(merged_sbom, *sboms_to_merge) _assert_root_was_inherited_from_left_sbom(merged_sbom, sboms_to_merge[0]) _assert_there_is_only_one_root(merged_sbom) _assert_sbom_is_well_formed(merged_sbom) -def _same_order(sbom1: SPDXSbom, sbom2: SPDXSbom) -> bool: +def _same_relationship_order(sbom1: SPDXSbom, sbom2: SPDXSbom) -> bool: for r1, r2 in zip(sbom1.relationships, sbom2.relationships): if r1 != r2: return False @@ -272,16 +336,18 @@ def _same_order(sbom1: SPDXSbom, sbom2: SPDXSbom) -> bool: ), ], ) -def test_merging_spdx_sboms_produces_consistent_ordering( +def test_merging_spdx_sboms_produces_consistent_relationships_ordering( sboms_to_merge: list[Any], # 'Any' is used to prevent mypy from having a fit over re-binding ) -> None: + # TODO: this must be moved to integration tests due to hash seed dependency. + # This might require some rework to ITs. Keeping this code here as a reminder. sboms_to_merge = [SPDXSbom.from_file(Path(s)) for s in sboms_to_merge] merged_sbom1 = reduce(add, sboms_to_merge) merged_sbom2 = reduce(add, sboms_to_merge) merged_sbom3 = reduce(add, sboms_to_merge) - assert _same_order(merged_sbom1, merged_sbom2), "Order mismatch!" - assert _same_order(merged_sbom2, merged_sbom3), "Order mismatch!" + assert _same_relationship_order(merged_sbom1, merged_sbom2), "Order mismatch!" + assert _same_relationship_order(merged_sbom2, merged_sbom3), "Order mismatch!" _assert_sbom_is_well_formed(merged_sbom1)