From a9544f709d514c2e6f8ee1664212b7b73b4bef01 Mon Sep 17 00:00:00 2001 From: Mart Ratas Date: Mon, 30 Sep 2024 11:05:52 +0100 Subject: [PATCH] CU-869574kvp update snomed preprocessing naming (#469) * CU-869574kvp: Add pattern based release version identifying for Snomed preprocessing * CU-869574kvp: Add tests for pattern-based snomed release identification * CU-869574kvp: Update Snomed preprocessing: Separate extensions into an Enum. Do the release/paths check at init to allow for early failures in case of issues * CU-869574kvp: Simplify mappings somewhat. Move common avoids to a common location. Fix UK Drug relationship name * CU-869574kvp: Simplify mappings somewhat more. Remove some clutter by separating common prefixes for release types and file names. * CU-869574kvp: Simplify mappings somewhat more, agai. Remove some clutter by separating common suffixes for release types. * CU-869574kvp: Update preprocessing. New abstraction. Use supprted extensions which describe their file formats along with bundles which give some further insight and control. * CU-869574kvp: Fix data class init * CU-869574kvp: Fix issue with file paths * CU-869574kvp: Fix a UK Clinical description file path * CU-869574kvp: Add (optional) 2nd part of folder name to extension. For AU models, the folder name seems to be 'SnomedCT_Release_AU1000036_20240630T120000Z', so the 1st part is just 'Release' and the 2nd part is indicative of AU. Add usage of this where relevant. * CU-869574kvp: Fix preprocessing tests. Add patch for files/folders where applicable. Change the paths of attributes where applicable. --- medcat/utils/preprocess_snomed.py | 408 ++++++++++++++++++-------- tests/utils/test_preprocess_snomed.py | 122 +++++++- 2 files changed, 392 insertions(+), 138 deletions(-) diff --git a/medcat/utils/preprocess_snomed.py b/medcat/utils/preprocess_snomed.py index ac78548a7..60bbb6994 100644 --- a/medcat/utils/preprocess_snomed.py +++ b/medcat/utils/preprocess_snomed.py @@ -3,6 +3,9 @@ import re import hashlib import pandas as pd +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, field +from enum import Enum, auto def parse_file(filename, first_row_header=True, columns=None): @@ -61,6 +64,174 @@ def get_direct_refset_mapping(in_dict: dict) -> dict: return ret_dict + + +_IGNORE_TAG = '##IGNORE-THIS##' + + +class RefSetFileType(Enum): + concept = auto() + description = auto() + relationship = auto() + refset = auto() + + +@dataclass +class FileFormatDescriptor: + concept: str + description: str + relationship: str + refset: str + common_prefix: str = "sct2_" # for concept, description, and relationship (but not refset) + + @classmethod + def ignore_all(cls) -> 'FileFormatDescriptor': + return cls(concept=_IGNORE_TAG, description=_IGNORE_TAG, + relationship=_IGNORE_TAG, refset=_IGNORE_TAG) + + def get_file_per_type(self, file_type: RefSetFileType) -> str: + raw = self._get_raw(file_type) + return raw if file_type == RefSetFileType.refset else self.common_prefix + raw + + def _get_raw(self, file_type: RefSetFileType) -> str: + return getattr(self, file_type.name) + + def get_concept(self) -> str: + return self.get_file_per_type(RefSetFileType.concept) + + def get_description(self) -> str: + return self.get_file_per_type(RefSetFileType.description) + + def get_relationship(self) -> str: + return self.get_file_per_type(RefSetFileType.relationship) + + def get_refset(self) -> str: + return self.get_file_per_type(RefSetFileType.refset) + + +@dataclass +class ExtensionDescription: + exp_name_in_folder: str + exp_files: FileFormatDescriptor + exp_2nd_part_in_folder: Optional[str] = None + + +# pattern has: EXTENSION PRODUCTION RELEASE +SNOMED_FOLDER_NAME_PATTERN = re.compile("^SnomedCT_([A-Za-z0-9]+)_([A-Za-z0-9]+)_(\d{8}T\d{6}Z$)") +PER_FILE_TYPE_PATHS = { + RefSetFileType.concept: os.path.join("Snapshot", "Terminology"), + RefSetFileType.description: os.path.join("Snapshot", "Terminology"), + RefSetFileType.relationship: os.path.join("Snapshot", "Terminology"), + RefSetFileType.refset: os.path.join("Snapshot", "Refset", "Map"), +} + + + +class SupportedExtension(Enum): + INTERNATIONAL = ExtensionDescription( + exp_name_in_folder="InternationalRF2", + exp_files=FileFormatDescriptor( + concept="Concept_Snapshot", + description="Description_Snapshot-en", + relationship="Relationship_Snapshot", + # NOTE: the below will be ignored for UK_CLIN bundle + refset="der2_iisssccRefset_ExtendedMapSnapshot" + ), + ) + UK_CLINICAL = ExtensionDescription( + exp_name_in_folder="UKClinicalRF2", + exp_files=FileFormatDescriptor( + concept="Concept_UKCLSnapshot", + description="Description_UKCLSnapshot-en", + relationship="Relationship_UKCLSnapshot", + refset="der2_iisssciRefset_ExtendedMapUKCLSnapshot" + ), + ) + UK_CLINICAL_REFSET = ExtensionDescription( + exp_name_in_folder="UKClinicalRefsetsRF2", + exp_files=FileFormatDescriptor.ignore_all() + ) + UK_EDITION = ExtensionDescription( + exp_name_in_folder="UKEditionRF2", + exp_files=FileFormatDescriptor( + concept="Concept_UKEDSnapshot", + description="Description_UKEDSnapshot-en", + relationship="Relationship_UKEDSnapshot", + refset="der2_iisssciRefset_ExtendedMapUKEDSnapshot" + ), + ) + UK_DRUG = ExtensionDescription( + exp_name_in_folder="UKDrugRF2", + exp_files=FileFormatDescriptor( + concept="Concept_UKDGSnapshot", + description="Description_UKDGSnapshot-en", + relationship="Relationship_UKDGSnapshot", + refset="der2_iisssciRefset_ExtendedMapUKDGSnapshot", + ), + ) + AU = ExtensionDescription( + exp_name_in_folder="Release", + exp_2nd_part_in_folder="AU1000036", + exp_files=FileFormatDescriptor( + concept="Concept_Snapshot", + description="Description_Snapshot-en-AU", + relationship="Relationship_Snapshot", + refset=_IGNORE_TAG, + ), + ) + + +@dataclass +class BundleDescriptor: + extensions: List[SupportedExtension] + ignores: Dict[RefSetFileType, List[SupportedExtension]] = field(default_factory=dict) + + def has_invalid(self, ext: SupportedExtension, file_types: Tuple[RefSetFileType]) -> bool: + for ft in file_types: + if ft not in self.ignores: + continue + exts2ignore = self.ignores[ft] + if ext in exts2ignore: + return True + return False + + +class SupportedBundles(Enum): + UK_CLIN = BundleDescriptor( + extensions=[SupportedExtension.INTERNATIONAL, SupportedExtension.UK_CLINICAL, + SupportedExtension.UK_CLINICAL_REFSET, SupportedExtension.UK_EDITION], + ignores={RefSetFileType.refset: [SupportedExtension.INTERNATIONAL]} + ) + UK_DRUG_EXT = BundleDescriptor( + extensions=[SupportedExtension.UK_DRUG, SupportedExtension.UK_EDITION], + ) + + +def match_partials_with_folders(exp_names: List[Tuple[str, Optional[str]]], + folder_names: List[str], + _group_nr1: int = 1, _group_nr2: int = 2) -> bool: + if len(exp_names) > len(folder_names): + return False + available_folders = [os.path.basename(f) for f in folder_names] + for exp_name, exp_name_p2 in exp_names: + found_cur_name = False + for fi, folder in enumerate(available_folders): + m = SNOMED_FOLDER_NAME_PATTERN.match(folder) + if not m: + continue + if m.group(_group_nr1) != exp_name: + continue + if exp_name_p2 and m.group(_group_nr2) != exp_name_p2: + continue + found_cur_name = True + break + if found_cur_name: + available_folders.pop(fi) + else: + return False + return True + + class Snomed: """ Pre-process SNOMED CT release files. @@ -74,26 +245,71 @@ class Snomed: uk_drug_ext (bool, optional): Specifies whether the version is a SNOMED UK drug extension. Defaults to False. au_ext (bool, optional): Specifies whether the version is a AU release. Defaults to False. """ + NO_VERSION_DETECTED = 'N/A' - def __init__(self, data_path, uk_ext=False, uk_drug_ext=False, au_ext: bool = False): + def __init__(self, data_path): self.data_path = data_path - self.release = data_path[-16:-8] - self.uk_ext = uk_ext - self.uk_drug_ext = uk_drug_ext + self.bundle = self._determine_bundle(self.data_path) + self.paths, self.snomed_releases, self.exts = self._check_path_and_release() + + @classmethod + def _determine_bundle(cls, data_path) -> Optional[SupportedBundles]: + if not os.path.exists(data_path) or not os.path.isdir(data_path): + return None + for bundle in SupportedBundles: + folder_names = list(os.listdir(data_path)) + exp_names = [(ext.value.exp_name_in_folder, ext.value.exp_2nd_part_in_folder) + for ext in bundle.value.extensions] + if match_partials_with_folders(exp_names, folder_names): + return bundle + return None + + def _set_extension(self, release: str, extension: SupportedExtension) -> None: self.opcs_refset_id = "1126441000000105" - if ((self.uk_ext or self.uk_drug_ext) and + if (extension in (SupportedExtension.UK_CLINICAL, SupportedExtension.UK_DRUG) and # using lexicographical comparison below # e.g "20240101" > "20231122" results in True # yet "20231121" > "20231122" results in False - len(self.release) == len("20231122") and self.release >= "20231122"): + len(release) == len("20231122") and release >= "20231122"): # NOTE for UK extensions starting from 20231122 the # OPCS4 refset ID seems to be different self.opcs_refset_id = '1382401000000109' - self.au_ext = au_ext - # validate - if (self.uk_ext or self.uk_drug_ext) and self.au_ext: - raise ValueError("Cannot both be a UK and and a AU version. " - f"Got UK={uk_ext}, UK_Drug={uk_drug_ext}, AU={au_ext}") + self._extension = extension + + @classmethod + def _determine_extension(cls, folder_path: str, + _group_nr1: int = 1, _group_nr2: int = 2) -> SupportedExtension: + folder_basename = os.path.basename(folder_path) + m = SNOMED_FOLDER_NAME_PATTERN.match(folder_basename) + if not m: + raise UnkownSnomedReleaseException( + f"Unable to determine extension for path {repr(folder_path)}. " + f"Checking against pattern {SNOMED_FOLDER_NAME_PATTERN}") + ext_str = m.group(_group_nr1) + ext_str2 = m.group(_group_nr2) + for extension in SupportedExtension: + if extension.value.exp_name_in_folder != ext_str: + continue + if (extension.value.exp_2nd_part_in_folder and + extension.value.exp_2nd_part_in_folder != ext_str2): + continue + return extension + ext_names_folders = ",".join([f"{ext.name} ({ext.value.exp_name_in_folder})" + for ext in SupportedExtension]) + raise UnkownSnomedReleaseException( + f"Cannot Find the extension for {folder_path}. " + f"Tried the following extensions: {ext_names_folders}") + + @classmethod + def _determine_release(cls, folder_path: str, strict: bool = True, + _group_nr: int = 3, _keep_chars: int = 8) -> str: + folder_basename = os.path.basename(folder_path) + match = SNOMED_FOLDER_NAME_PATTERN.match(folder_basename) + if match is None and strict: + raise UnkownSnomedReleaseException(f"No version found in '{folder_path}'") + elif match is None: + return cls.NO_VERSION_DETECTED + return match.group(_group_nr)[:_keep_chars] def to_concept_df(self): """ @@ -106,37 +322,17 @@ def to_concept_df(self): Returns: pandas.DataFrame: SNOMED CT concept DataFrame. """ - paths, snomed_releases = self._check_path_and_release() df2merge = [] - for i, snomed_release in enumerate(snomed_releases): - contents_path = os.path.join(paths[i], "Snapshot", "Terminology") - concept_snapshot = "sct2_Concept_Snapshot" - description_snapshot = "sct2_Description_Snapshot-en" - if self.au_ext: - description_snapshot += "-AU" - if self.uk_ext: - if "SnomedCT_UKClinicalRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKCLSnapshot" - description_snapshot = "sct2_Description_UKCLSnapshot-en" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - description_snapshot = "sct2_Description_UKEDSnapshot-en" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass - if self.uk_drug_ext: - if "SnomedCT_UKDrugRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKDGSnapshot" - description_snapshot = "sct2_Description_UKDGSnapshot-en" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - description_snapshot = "sct2_Description_UKEDSnapshot-en" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass + for i, snomed_release in enumerate(self.snomed_releases): + self._set_extension(snomed_release, self.exts[i]) + contents_path = os.path.join(self.paths[i], PER_FILE_TYPE_PATHS[RefSetFileType.concept]) + concept_snapshot = self._extension.value.exp_files.get_concept() + description_snapshot = self._extension.value.exp_files.get_description() + if concept_snapshot in (None, _IGNORE_TAG) or ( + self.bundle and self.bundle.value.has_invalid( + self._extension, [RefSetFileType.concept, RefSetFileType.description])): + continue for f in os.listdir(contents_path): m = re.search(f'{concept_snapshot}'+r'_(.*)_\d*.txt', f) @@ -202,37 +398,16 @@ def list_all_relationships(self): Returns: list: List of all SNOMED CT relationships. """ - paths, snomed_releases = self._check_path_and_release() all_rela = [] - for i, snomed_release in enumerate(snomed_releases): - contents_path = os.path.join(paths[i], "Snapshot", "Terminology") - concept_snapshot = "sct2_Concept_Snapshot" - relationship_snapshot = "sct2_Relationship_Snapshot" - if self.uk_ext: - if "SnomedCT_InternationalRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_Snapshot" - relationship_snapshot = "sct2_Relationship_Snapshot" - elif "SnomedCT_UKClinicalRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKCLSnapshot" - relationship_snapshot = "sct2_Relationship_UKCLSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - relationship_snapshot = "sct2_Relationship_UKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass - if self.uk_drug_ext: - if "SnomedCT_UKDrugRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKDGSnapshot" - relationship_snapshot = "sct2_Relationship_UKDGSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - relationship_snapshot = "sct2_Relationship_UKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass + for i, snomed_release in enumerate(self.snomed_releases): + self._set_extension(snomed_release, self.exts[i]) + contents_path = os.path.join(self.paths[i], PER_FILE_TYPE_PATHS[RefSetFileType.concept]) + concept_snapshot = self._extension.value.exp_files.get_concept() + relationship_snapshot = self._extension.value.exp_files.get_relationship() + if concept_snapshot in (None, _IGNORE_TAG) or ( + self.bundle and self.bundle.value.has_invalid( + self._extension, [RefSetFileType.concept, RefSetFileType.description])): + continue for f in os.listdir(contents_path): m = re.search(f'{concept_snapshot}'+r'_(.*)_\d*.txt', f) @@ -259,37 +434,16 @@ def relationship2json(self, relationshipcode, output_jsonfile): Returns: file: JSON file of relationship mapping. """ - paths, snomed_releases = self._check_path_and_release() output_dict = {} - for i, snomed_release in enumerate(snomed_releases): - contents_path = os.path.join(paths[i], "Snapshot", "Terminology") - concept_snapshot = "sct2_Concept_Snapshot" - relationship_snapshot = "sct2_Relationship_Snapshot" - if self.uk_ext: - if "SnomedCT_InternationalRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_Snapshot" - relationship_snapshot = "sct2_Relationship_Snapshot" - elif "SnomedCT_UKClinicalRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKCLSnapshot" - relationship_snapshot = "sct2_Relationship_UKCLSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - relationship_snapshot = "sct2_Relationship_UKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass - if self.uk_drug_ext: - if "SnomedCT_UKDrugRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKDGSnapshot" - relationship_snapshot = "sct2_Relationship_UKDGSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - concept_snapshot = "sct2_Concept_UKEDSnapshot" - relationship_snapshot = "sct2_Relationship_UKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass + for i, snomed_release in enumerate(self.snomed_releases): + self._set_extension(snomed_release, self.exts[i]) + contents_path = os.path.join(self.paths[i], PER_FILE_TYPE_PATHS[RefSetFileType.concept]) + concept_snapshot = self._extension.value.exp_files.get_concept() + relationship_snapshot = self._extension.value.exp_files.get_relationship() + if concept_snapshot in (None, _IGNORE_TAG) or ( + self.bundle and self.bundle.value.has_invalid( + self._extension, [RefSetFileType.concept, RefSetFileType.description])): + continue for f in os.listdir(contents_path): m = re.search(f'{concept_snapshot}'+r'_(.*)_\d*.txt', f) @@ -322,7 +476,7 @@ def map_snomed2icd10(self): dict: A dictionary containing the SNOMED CT to ICD-10 mappings including metadata. """ snomed2icd10df = self._map_snomed2refset() - if self.uk_ext is True: + if self._extension in (SupportedExtension.UK_CLINICAL, SupportedExtension.UK_DRUG): return self._refset_df2dict(snomed2icd10df[0]) else: return self._refset_df2dict(snomed2icd10df) @@ -340,7 +494,7 @@ def map_snomed2opcs4(self) -> dict: Returns: dict: A dictionary containing the SNOMED CT to OPCS-4 mappings including metadata. """ - if self.uk_ext is not True: + if self._extension not in (SupportedExtension.UK_CLINICAL, SupportedExtension.UK_DRUG): raise AttributeError( "OPCS-4 mapping does not exist in this edition") snomed2opcs4df = self._map_snomed2refset()[1] @@ -361,17 +515,21 @@ def _check_path_and_release(self): """ snomed_releases = [] paths = [] + exts = [] if "Snapshot" in os.listdir(self.data_path): paths.append(self.data_path) - snomed_releases.append(self.release) + snomed_releases.append(self._determine_release(self.data_path, strict=True)) + exts.append(self._determine_extension(self.data_path)) else: for folder in os.listdir(self.data_path): if "SnomedCT" in folder: paths.append(os.path.join(self.data_path, folder)) - snomed_releases.append(folder[-16:-8]) + rel = self._determine_release(folder, strict=True) + snomed_releases.append(rel) + exts.append(self._determine_extension(paths[-1])) if len(paths) == 0: raise FileNotFoundError('Incorrect path to SNOMED CT directory') - return paths, snomed_releases + return paths, snomed_releases, exts def _refset_df2dict(self, refset_df: pd.DataFrame) -> dict: """ @@ -403,31 +561,15 @@ def _map_snomed2refset(self): OR tuple: Tuple of dataframes containing SNOMED CT to refset mappings and metadata (ICD-10, OPCS4), if uk_ext is True. """ - paths, snomed_releases = self._check_path_and_release() dfs2merge = [] - for i, snomed_release in enumerate(snomed_releases): - refset_terminology = f'{paths[i]}/Snapshot/Refset/Map' - icd10_ref_set = 'der2_iisssccRefset_ExtendedMapSnapshot' - if self.uk_ext: - if "SnomedCT_InternationalRF2_PRODUCTION" in paths[i]: - continue - elif "SnomedCT_UKClinicalRF2_PRODUCTION" in paths[i]: - icd10_ref_set = "der2_iisssciRefset_ExtendedMapUKCLSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - icd10_ref_set = "der2_iisssciRefset_ExtendedMapUKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass - if self.uk_drug_ext: - if "SnomedCT_UKDrugRF2_PRODUCTION" in paths[i]: - icd10_ref_set = "der2_iisssciRefset_ExtendedMapUKDGSnapshot" - elif "SnomedCT_UKEditionRF2_PRODUCTION" in paths[i]: - icd10_ref_set = "der2_iisssciRefset_ExtendedMapUKEDSnapshot" - elif "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION" in paths[i]: - continue - else: - pass + for i, snomed_release in enumerate(self.snomed_releases): + self._set_extension(snomed_release, self.exts[i]) + refset_terminology = os.path.join(self.paths[i], PER_FILE_TYPE_PATHS[RefSetFileType.refset]) + icd10_ref_set = self._extension.value.exp_files.get_refset() + if icd10_ref_set in (None, _IGNORE_TAG) or ( + self.bundle and self.bundle.value.has_invalid( + self._extension, [RefSetFileType.concept, RefSetFileType.description])): + continue for f in os.listdir(refset_terminology): m = re.search(f'{icd10_ref_set}'+r'_(.*)_\d*.txt', f) if m: @@ -440,10 +582,16 @@ def _map_snomed2refset(self): dfs2merge.append(icd_mappings) mapping_df = pd.concat(dfs2merge) del dfs2merge - if self.uk_ext or self.uk_drug_ext: + if self._extension in (SupportedExtension.UK_CLINICAL, SupportedExtension.UK_DRUG): opcs_df = mapping_df[mapping_df['refsetId'] == self.opcs_refset_id] icd10_df = mapping_df[mapping_df['refsetId'] == '999002271000000101'] return icd10_df, opcs_df else: return mapping_df + + +class UnkownSnomedReleaseException(ValueError): + + def __init__(self, *args) -> None: + super().__init__(*args) diff --git a/tests/utils/test_preprocess_snomed.py b/tests/utils/test_preprocess_snomed.py index 59a00f6fc..a133acdf8 100644 --- a/tests/utils/test_preprocess_snomed.py +++ b/tests/utils/test_preprocess_snomed.py @@ -1,7 +1,11 @@ +import os from typing import Dict +import contextlib + from medcat.utils import preprocess_snomed import unittest +from unittest.mock import patch EXAMPLE_REFSET_DICT: Dict = { @@ -45,20 +49,122 @@ def test_example_no_codfe_fails(self): with self.assertRaises(KeyError): preprocess_snomed.get_direct_refset_mapping(EXAMPLE_REFSET_DICT_NO_CODE) + EXAMPLE_SNOMED_PATH_OLD = "SnomedCT_InternationalRF2_PRODUCTION_20220831T120000Z" EXAMPLE_SNOMED_PATH_NEW = "SnomedCT_UKClinicalRF2_PRODUCTION_20231122T000001Z" -class TestSnomedVersionsOPCS4(unittest.TestCase): +@contextlib.contextmanager +def patch_fake_files(path: str, subfiles: list = [], + subdirs: list = ["Snapshot"]): + def cur_listdir(file_path: str, *args, **kwargs) -> list: + if file_path == path: + return subfiles + subdirs + for sd in subdirs: + subdir = os.path.join(path, sd) + if subdir == path: + return [] + raise FileNotFoundError(path) - def test_old_gets_old_OPCS4_mapping_nonuk_ext(self): - snomed = preprocess_snomed.Snomed(EXAMPLE_SNOMED_PATH_OLD, uk_ext=False) - self.assertEqual(snomed.opcs_refset_id, "1126441000000105") + def cur_isfile(file_path: str, *args, **kwargs) -> bool: + print("CUR isfile", file_path) + return file_path == path or file_path in [os.path.join(path, subfiles)] + + def cur_isdir(file_path: str, *args, **kwrags) -> bool: + print("CUR isdir", file_path) + return file_path == path or file_path in [os.path.join(path, subdirs)] + + with patch("os.listdir", new=cur_listdir): + with patch("os.path.isfile", new=cur_isfile): + with patch("os.path.isdir", new=cur_isdir): + yield - def test_old_gets_old_OPCS4_mapping_uk_ext(self): - snomed = preprocess_snomed.Snomed(EXAMPLE_SNOMED_PATH_OLD, uk_ext=True) + +class TestSnomedVersionsOPCS4(unittest.TestCase): + + def test_old_gets_old_OPCS4_mapping(self): + with patch_fake_files(EXAMPLE_SNOMED_PATH_OLD): + snomed = preprocess_snomed.Snomed(EXAMPLE_SNOMED_PATH_OLD) + snomed._set_extension(snomed._determine_release(EXAMPLE_SNOMED_PATH_OLD), + snomed._determine_extension(EXAMPLE_SNOMED_PATH_OLD)) self.assertEqual(snomed.opcs_refset_id, "1126441000000105") - def test_new_gets_new_OCPS4_mapping_uk_ext(self): - snomed = preprocess_snomed.Snomed(EXAMPLE_SNOMED_PATH_NEW, uk_ext=True) + def test_new_gets_new_OCPS4_mapping(self): + with patch_fake_files(EXAMPLE_SNOMED_PATH_NEW): + snomed = preprocess_snomed.Snomed(EXAMPLE_SNOMED_PATH_NEW) + snomed._set_extension(snomed._determine_release(EXAMPLE_SNOMED_PATH_NEW), + snomed._determine_extension(EXAMPLE_SNOMED_PATH_NEW)) self.assertEqual(snomed.opcs_refset_id, "1382401000000109") + + +class TestSnomedModelGetter(unittest.TestCase): + WORKING_BASE_NAMES = [ + "SnomedCT_InternationalRF2_PRODUCTION_20240201T120000Z", + "SnomedCT_InternationalRF2_PRODUCTION_20240601T120000Z", + "SnomedCT_UKClinicalRF2_PRODUCTION_20240410T000001Z", + "SnomedCT_UKClinicalRefsetsRF2_PRODUCTION_20240410T000001Z", + "SnomedCT_UKDrugRF2_PRODUCTION_20240508T000001Z", + "SnomedCT_UKEditionRF2_PRODUCTION_20240410T000001Z", + "SnomedCT_UKEditionRF2_PRODUCTION_20240508T000001Z", + "SnomedCT_Release_AU1000036_20240630T120000Z", + ] + FAILING_BASE_NAMES = [ + "uk_sct2cl_38.2.0_20240605000001Z", + "uk_sct2cl_32.6.0_20211027000001Z", + ] + PATH = os.path.join("path", "to", "release") + + def _pathify(self, in_list: list) -> list: + return [os.path.join(self.PATH, folder) for folder in in_list] + + def assert_got_version(self, snomed: preprocess_snomed.Snomed, raw_name: str): + rel_list = snomed.snomed_releases + self.assertIsInstance(rel_list, list) + self.assertEqual(len(rel_list), 1) + rel = rel_list[0] + self.assertIsInstance(rel, str) + self.assertIn(rel, raw_name) + self.assertEqual(rel, raw_name[-16:-8]) + + def assert_all_work(self, all_paths: list): + for path in all_paths: + with self.subTest(f"Rrelease name: {path}"): + with patch_fake_files(path): + snomed = preprocess_snomed.Snomed(path) + self.assert_got_version(snomed, path) + + def test_gets_model_form_basename(self): + self.assert_all_work(self.WORKING_BASE_NAMES) + + def test_gets_model_from_path(self): + full_paths = self._pathify(self.WORKING_BASE_NAMES) + self.assert_all_work(full_paths) + + def assert_raises(self, folder_path: str): + with self.assertRaises(preprocess_snomed.UnkownSnomedReleaseException): + preprocess_snomed.Snomed._determine_release(folder_path, strict=True) + + def assert_all_raise(self, folder_paths: list): + for folder_path in folder_paths: + with self.subTest(f"Folder: {folder_path}"): + self.assert_raises(folder_path) + + def test_fails_on_incorrect_names_strict(self): + self.assert_all_raise(self.FAILING_BASE_NAMES) + + def test_fails_on_incorrect_paths_strict(self): + full_paths = self._pathify(self.FAILING_BASE_NAMES) + self.assert_all_raise(full_paths) + + def assert_all_get_no_version(self, folder_paths: list): + for folder_path in folder_paths: + with self.subTest(f"Folder: {folder_path}"): + det_rel = preprocess_snomed.Snomed._determine_release(folder_path, strict=False) + self.assertEqual(det_rel, preprocess_snomed.Snomed.NO_VERSION_DETECTED) + + def test_gets_no_version_incorrect_names_nonstrict(self): + self.assert_all_get_no_version(self.FAILING_BASE_NAMES) + + def test_gets_no_version_incorrect_paths_nonstrict(self): + full_paths = self._pathify(self.FAILING_BASE_NAMES) + self.assert_all_get_no_version(full_paths)