Skip to content

Commit

Permalink
Improve CC scheme extraction and matching.
Browse files Browse the repository at this point in the history
This significantly improves the CC scheme extraction by:

 - Fixing the extraction of several schemes that were mixing
   certified and archived entries by accident.
 - Improving the extraction of cert_ids from scheme sites.
 - Improving the matching heuristic to consider more attributes
   that are usually present in the site data.

Also adds an evaluation notebook to see how this performs.
  • Loading branch information
J08nY committed Nov 7, 2024
1 parent 11a7052 commit 17ae4db
Show file tree
Hide file tree
Showing 15 changed files with 326 additions and 166 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ repos:
- "types-PyYAML"
- "types-python-dateutil"
- "types-requests"
- "types-dateparser"
- "datasets"
2 changes: 1 addition & 1 deletion src/sec_certs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class Configuration(BaseSettings):
description="True if new reference annotator model shall be build, False otherwise.",
)
cc_matching_threshold: int = Field(
90,
70,
description="Level of required similarity before CC scheme entry is considered to match a CC certificate.",
ge=0,
le=100,
Expand Down
4 changes: 3 additions & 1 deletion src/sec_certs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
CC_KOREA_BASE_URL = "https://itscc.kr"
CC_KOREA_EN_URL = CC_KOREA_BASE_URL + "/main/mainEn.do"
CC_KOREA_CERTIFIED_URL = CC_KOREA_BASE_URL + "/certprod/listA.do"
CC_KOREA_PRODUCT_URL = CC_KOREA_BASE_URL + "/certprod/view.do?product_id={}&product_class=1"
CC_KOREA_SUSPENDED_URL = CC_KOREA_BASE_URL + "/certprod/listB.do"
CC_KOREA_ARCHIVED_URL = CC_KOREA_BASE_URL + "/certprod/listD.do"
CC_KOREA_PRODUCT_URL = CC_KOREA_BASE_URL + "/certprod/view.do?product_id={}&product_class={}"
CC_POLAND_BASE_URL = "https://en.nask.pl"
CC_POLAND_CERTIFIED_URL = CC_POLAND_BASE_URL + "/eng/activities/certification/list-of-certificates"
CC_POLAND_INEVAL_URL = CC_POLAND_BASE_URL + "/eng/activities/certification/ongoing-certifications"
Expand Down
4 changes: 2 additions & 2 deletions src/sec_certs/dataset/cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,12 +870,12 @@ def _compute_scheme_data(self):
for scheme in self.auxiliary_datasets.scheme_dset:
if certified := scheme.lists.get(EntryType.Certified):
certs = [cert for cert in self if cert.status == "active"]
matches = CCSchemeMatcher.match_all(certified, scheme.country, certs)
matches, scores = CCSchemeMatcher.match_all(certified, scheme.country, certs)
for dgst, match in matches.items():
self[dgst].heuristics.scheme_data = match
if archived := scheme.lists.get(EntryType.Archived):
certs = [cert for cert in self if cert.status == "archived"]
matches = CCSchemeMatcher.match_all(archived, scheme.country, certs)
matches, scores = CCSchemeMatcher.match_all(archived, scheme.country, certs)
for dgst, match in matches.items():
self[dgst].heuristics.scheme_data = match

Expand Down
84 changes: 72 additions & 12 deletions src/sec_certs/model/cc_matching.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from __future__ import annotations

import contextlib
import re
from collections.abc import Iterable, Mapping, Sequence
from operator import itemgetter
from typing import Any

from sec_certs.cert_rules import rules
from sec_certs.configuration import config
from sec_certs.model.matching import AbstractMatcher
from sec_certs.sample.cc import CCCertificate
from sec_certs.sample.cc_certificate_id import CertificateId
from sec_certs.sample.cc_certificate_id import CertificateId, schemes
from sec_certs.utils.sanitization import sanitize_link_fname
from sec_certs.utils.strings import fully_sanitize_string

CATEGORIES = {
Expand Down Expand Up @@ -40,21 +44,23 @@ def __init__(self, entry: Mapping, scheme: str):
self.scheme = scheme
self._prepare()

def _get_from_entry(self, *keys: str) -> str | None:
for key in keys:
if val := self.entry.get(key):
return val
def _get_from_entry(self, *keys: str) -> Any | None:
# Prefer enhanced over base
if e := self.entry.get("enhanced"):
for key in keys:
if val := e.get(key):
return val
for key in keys:
if val := self.entry.get(key):
return val
return None

def _prepare(self):
def _prepare(self): # noqa: C901
self._canonical_cert_id = None
if cert_id := self._get_from_entry("cert_id", "id"):
self._cert_id = self._get_from_entry("cert_id", "id")
if self._cert_id:
with contextlib.suppress(Exception):
self._canonical_cert_id = CertificateId(self.scheme, cert_id).canonical
self._canonical_cert_id = CertificateId(self.scheme, self._cert_id).canonical

self._product = None
if product_name := self._get_from_entry("product", "title", "name"):
Expand All @@ -65,11 +71,39 @@ def _prepare(self):
self._vendor = fully_sanitize_string(vendor_name)

self._category = self._get_from_entry("category")
self._certification_date = self._get_from_entry("certification_date")
self._expiration_date = self._get_from_entry("expiration_date")
self._level = self._get_from_entry("level", "assurance_level")
if self._level:
self._level = self._level.upper().replace("AUGMENTED", "").replace("WITH", "")

filename_rules = rules["cc_filename_cert_id"][self.scheme]
scheme_meta = schemes[self.scheme]
if filename_rules and self._canonical_cert_id is None:
cert_link = self._get_from_entry("cert_link")
if cert_link:
cert_fname = sanitize_link_fname(cert_link)
for rule in filename_rules:
if match := re.match(rule, cert_fname):
with contextlib.suppress(Exception):
meta = match.groupdict()
self._canonical_cert_id = scheme_meta(meta)
break

report_link = self._get_from_entry("report_link")
if report_link and self._canonical_cert_id is None:
report_fname = sanitize_link_fname(report_link)
for rule in filename_rules:
if match := re.match(rule, report_fname):
with contextlib.suppress(Exception):
meta = match.groupdict()
self._canonical_cert_id = scheme_meta(meta)
break

self._report_hash = self._get_from_entry("report_hash")
self._target_hash = self._get_from_entry("target_hash")

def match(self, cert: CCCertificate) -> float:
def match(self, cert: CCCertificate) -> float: # noqa: C901
"""
Compute the match of this matcher to the certificate, a float from 0 to 100.
Expand Down Expand Up @@ -105,21 +139,47 @@ def match(self, cert: CCCertificate) -> float:
return 93

# Fuzzy match at the end with some penalization.
# Weigh the name and vendor more than the id and more than the level and certification date.
# 6, 6, 4, 2, 2
matches = {}
product_rating = self._compute_match(self._product, cert_name)
matches["product"] = (product_rating, 6)
vendor_rating = self._compute_match(self._vendor, cert_manufacturer)
return max((0, product_rating * 0.5 + vendor_rating * 0.5 - 2))
matches["vendor"] = (vendor_rating, 6)

if self._cert_id is not None and cert.heuristics.cert_id is not None:
id_rating = self._compute_match(self._cert_id, cert.heuristics.cert_id)
matches["id"] = (id_rating, 4)

if self._certification_date is not None and cert.not_valid_before is not None:
date_rating = 1
if cert.not_valid_before.year == self._certification_date.year:
date_rating += 33
if cert.not_valid_before.month == self._certification_date.month:
date_rating += 33
if cert.not_valid_before.day == self._certification_date.day:
date_rating += 33
matches["certification_date"] = (date_rating, 2)

if self._level is not None and cert.security_level:
level_rating = self._compute_match(self._level, ", ".join(cert.security_level))
matches["level"] = (level_rating, 2)
total_weight = sum(map(itemgetter(1), matches.values()))
return max((0, sum(match[0] * (match[1] / total_weight) for match in matches.values()) - 2))

@classmethod
def match_all(
cls, entries: list[dict[str, Any]], scheme: str, certificates: Iterable[CCCertificate]
) -> dict[str, dict[str, Any]]:
) -> tuple[dict[str, dict[str, Any]], dict[str, float]]:
"""
Match all entries of a given CC scheme to certificates from the dataset.
:param entries: The entries from the scheme, obtained from CCSchemeDataset.
:param scheme: The scheme, e.g. "DE".
:param certificates: The certificates to match against.
:return: A mapping of certificate digests to entries, without duplicates, not all entries may be present.
:return: Two mappings:
- A mapping of certificate digests to entries, without duplicates, not all entries may be present.
- A mapping of certificate digests to scores that they matched with.
"""
certs: list[CCCertificate] = list(filter(lambda cert: cert.scheme == scheme, certificates))
matchers: Sequence[CCSchemeMatcher] = [CCSchemeMatcher(entry, scheme) for entry in entries]
Expand Down
8 changes: 6 additions & 2 deletions src/sec_certs/model/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def _compute_match(self, one: str, other: str) -> float:
)

@staticmethod
def _match_certs(matchers: Sequence[AbstractMatcher], certs: list[CertSubType], threshold: float):
def _match_certs(
matchers: Sequence[AbstractMatcher], certs: list[CertSubType], threshold: float
) -> tuple[dict[str, Any], dict[str, float]]:
scores: list[tuple[float, int, int]] = []
matched_is: set[int] = set()
matched_js: set[int] = set()
Expand All @@ -39,6 +41,7 @@ def _match_certs(matchers: Sequence[AbstractMatcher], certs: list[CertSubType],
triple = (100 - score, i, j)
heappush(scores, triple)
results = {}
final_scores = {}
for triple in (heappop(scores) for _ in range(len(scores))):
inv_score, i, j = triple
# Do not match already matched entries/certs.
Expand All @@ -55,4 +58,5 @@ def _match_certs(matchers: Sequence[AbstractMatcher], certs: list[CertSubType],
cert = certs[i]
entry = matchers[j].entry
results[cert.dgst] = entry
return results
final_scores[cert.dgst] = score
return results, final_scores
2 changes: 1 addition & 1 deletion src/sec_certs/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cc_cert_id:
- "DCSS[Ii]-(?P<year>[0-9]{2,4})/(?P<counter>[0-9]+)([vV](?P<version>[0-9]))?"
- "Rapport de certification (?P<year>[0-9]{2,4})/(?P<counter>[0-9]+)([vV](?P<version>[0-9]))?"
- "Certification Report (?P<year>[0-9]{2,4})/(?P<counter>[0-9]+)([vV](?P<version>[0-9]))?"
- "ANSS[Ii](?:-CC)?[ -](?P<year>[0-9]{2,4})[/_-](?P<counter>[0-9]+)(?:-(?P<doc>(?:[MSR][0-9]+)))?([vV](?P<version>[0-9]))?"
- "ANSS[Ii](?:-CC)?(?:-(?P<type>PP|SITE))?[ -](?P<year>[0-9]{2,4})[/_-](?P<counter>[0-9]+)(?:-(?P<doc>(?:[MSR][0-9]+)))?([vV](?P<version>[0-9]))?"
# Examples:
# DCSSI-2009/07
# ANSSI-CC 2001/02-R01
Expand Down
18 changes: 17 additions & 1 deletion src/sec_certs/sample/cc_certificate_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ def FR(meta) -> str:
counter = meta["counter"]
doc = meta.get("doc")
version = meta.get("version")
cert_id = f"ANSSI-CC-{year}/{counter}"
type = meta.get("type")
cert_id = "ANSSI-CC-"
if type:
cert_id += f"{type}-"
cert_id += f"{year}/{counter}"
if doc:
cert_id += f"-{doc}"
if version:
Expand Down Expand Up @@ -183,6 +187,17 @@ def IT(meta) -> str:
return cert_id


def PL(meta) -> str:
number = meta["number"]
ac = meta.get("ac")
year = meta["year"]
cert_id = f"{number}/PC1/"
if ac:
cert_id += f"{ac}/"
cert_id += f"{year}"
return cert_id


# We have rules for some schemes to make canonical cert_ids.
schemes = {
"FR": FR,
Expand All @@ -202,6 +217,7 @@ def IT(meta) -> str:
"TR": TR,
"SG": SG,
"IT": IT,
"PL": PL,
}


Expand Down
Loading

0 comments on commit 17ae4db

Please sign in to comment.