diff --git a/gramps_webapi/api/dna.py b/gramps_webapi/api/dna.py new file mode 100644 index 00000000..991a6bb5 --- /dev/null +++ b/gramps_webapi/api/dna.py @@ -0,0 +1,297 @@ +"""Parser for raw DNA match data.""" + +from __future__ import annotations + +import itertools +import re +from collections.abc import Callable +from dataclasses import dataclass +from typing import Literal, Sequence, overload + +from gramps_webapi.types import MatchSegment + +SIDE_UNKNOWN = "U" +SIDE_MATERNAL = "M" +SIDE_PATERNAL = "P" + + +@dataclass +class SegmentColumnOrder: + """Order of the columns of a DNA match table.""" + + chromosome: int + start_position: int + end_position: int + centimorgans: int + num_snps: int | None = None + side: int | None = None + comment: int | None = None + + +def get_delimiter(rows: list[str]) -> str: + """Guess the delimiter of a string containing a CSV-like table. + + It is assumed that the table has at least 4 columns and at least one + row. + """ + if rows[0].count("\t") >= 3: + return "\t" + if rows[0].count(",") >= 3: + return "," + if rows[0].count(";") >= 3: + return ";" + raise ValueError("Could not determine delimiter.") + + +def is_numeric(value: str) -> bool: + """Determine if a string is number-like.""" + if value == "": + return False + try: + float(value) + return True + except ValueError: + pass + if re.match(r"^\d[\d\.,]*$", value): + return True + return False + + +def cast_int(value: str) -> int: + """Cast a string to an integer.""" + try: + return int(value.replace(",", "").replace(".", "")) + except (ValueError, TypeError): + return 0 + + +def cast_float(value: str) -> float: + """Cast a string to a float.""" + value = value.replace(" ", "") + if value.count(".") > 1: + value = value.replace(".", "") + if value.count(",") > 1: + value = value.replace(",", "") + if value.count(",") == 1 and value.count(".") == 0: + value = value.replace(",", ".") + try: + return float(value) + except ValueError: + return 0.0 + + +def has_header(rows: list[str], delimiter: str) -> bool: + """Determine if the table has a header.""" + if len(rows) < 2: + return False + header = rows[0] + if len(header) < 4: + return False + header_columns = header.split(delimiter) + if any(is_numeric(column) for column in header_columns): + return False + return True + + +@overload +def find_column_position( + column_names: list[str], + condition: Callable[[str], bool], + exclude_indices: Sequence[int], + allow_missing: Literal[False], +) -> int: ... + + +@overload +def find_column_position( + column_names: list[str], + condition: Callable[[str], bool], + exclude_indices: Sequence[int], + allow_missing: Literal[True], +) -> int | None: ... + + +def find_column_position( + column_names: list[str], + condition: Callable[[str], bool], + exclude_indices: Sequence[int], + allow_missing: bool = False, +) -> int | None: + """Find the position of a column in a list of column names or raise a ValueError.""" + for i, column in enumerate(column_names): + if i in exclude_indices: + continue + if condition(column.lower().strip()): + return i + if allow_missing: + return None + raise ValueError("Column not found.") + + +def get_order( + header: list[str] | None, data_columns: Sequence[Sequence[str | None]] +) -> SegmentColumnOrder: + """Get the order of the columns.""" + if header is None: + # use the default ordering of the DNASegmentMap Gramplet + # https://gramps-project.org/wiki/index.php/Addon:DNASegmentMapGramplet + if len(data_columns) >= 6: + # check whether the 6th column contains side information + if all( + (not value) or (value in {SIDE_MATERNAL, SIDE_PATERNAL, SIDE_UNKNOWN}) + for value in data_columns[5] + ): + return SegmentColumnOrder( + chromosome=0, + start_position=1, + end_position=2, + centimorgans=3, + num_snps=4, + side=5, + comment=6, + ) + return SegmentColumnOrder( + chromosome=0, + start_position=1, + end_position=2, + centimorgans=3, + num_snps=4, + comment=5, + ) + exclude_indices: list[int] = [] + chromosome = find_column_position( + header, + lambda col: col.startswith("chr"), + exclude_indices=exclude_indices, + allow_missing=False, + ) + exclude_indices.append(chromosome) + start_position = find_column_position( + header, + lambda col: "start" in col, + exclude_indices=exclude_indices, + allow_missing=False, + ) + exclude_indices.append(start_position) + end_position = find_column_position( + header, + lambda col: "end" in col + or "stop" in col + or ("length" in col and "morgan" not in col), + exclude_indices=exclude_indices, + allow_missing=False, + ) + exclude_indices.append(end_position) + centimorgans = find_column_position( + header, + lambda col: col.startswith("cm") or "centimorgan" in col or "length" in col, + exclude_indices=exclude_indices, + allow_missing=False, + ) + exclude_indices.append(centimorgans) + num_snps = find_column_position( + header, + lambda col: "snp" in col, + exclude_indices=exclude_indices, + allow_missing=True, + ) + if num_snps is not None: + exclude_indices.append(num_snps) + side = find_column_position( + header, + lambda col: col.startswith("side"), + exclude_indices=exclude_indices, + allow_missing=True, + ) + if side is not None: + exclude_indices.append(side) + comment = find_column_position( + header, + lambda _: True, # take the first column that has not been matched yet + exclude_indices=exclude_indices, + allow_missing=True, + ) + return SegmentColumnOrder( + chromosome=chromosome, + start_position=start_position, + end_position=end_position, + centimorgans=centimorgans, + num_snps=num_snps, + side=side, + comment=comment, + ) + + +def transpose_jagged_nested_list( + data: Sequence[Sequence[str | None]], +) -> list[list[str | None]]: + """Transpose a jagged nested list, replacing missing values with None.""" + return list(map(list, itertools.zip_longest(*data, fillvalue=None))) + + +def parse_raw_dna_match_string(raw_string: str) -> list[MatchSegment]: + """Parse a raw DNA match string.""" + rows = raw_string.strip().split("\n") + try: + delimiter = get_delimiter(rows) + except ValueError: + return [] + header: list[str] | None + if has_header(rows, delimiter): + header = rows[0].split(delimiter) + rows = rows[1:] + else: + header = None + data = [row.split(delimiter) for row in rows] + data_columns = transpose_jagged_nested_list(data) + try: + order = get_order(header, data_columns=data_columns) + except ValueError: + return [] + segments = [] + for row in rows: + if row.strip() == "": + continue + try: + match_segment = process_row(fields=row.split(delimiter), order=order) + except (ValueError, TypeError): + continue + if match_segment: + segments.append(match_segment) + return segments + + +def process_row(fields: list[str], order: SegmentColumnOrder) -> MatchSegment | None: + """Process a row of a DNA match table.""" + if len(fields) < 4: + return None + try: + chromo = fields[order.chromosome].strip() + start = cast_int(fields[order.start_position].strip()) + stop = cast_int(fields[order.end_position].strip()) + cms = cast_float(fields[order.centimorgans].strip()) + if order.num_snps is not None and len(fields) >= order.num_snps + 1: + snp = cast_int(fields[order.num_snps].strip()) + else: + snp = 0 + if order.side is not None and len(fields) >= order.side + 1: + side = fields[order.side].strip().upper() + if side not in {SIDE_MATERNAL, SIDE_PATERNAL}: + side = SIDE_UNKNOWN + else: + side = SIDE_UNKNOWN + if order.comment is not None and len(fields) >= order.comment + 1: + comment = fields[order.comment].strip() + else: + comment = "" + except (ValueError, TypeError): + return None + return { + "chromosome": chromo, + "start": start, + "stop": stop, + "side": side, + "cM": cms, + "SNPs": snp, + "comment": comment, + } diff --git a/gramps_webapi/api/resources/dna.py b/gramps_webapi/api/resources/dna.py index 80bdd0c6..f11ee781 100644 --- a/gramps_webapi/api/resources/dna.py +++ b/gramps_webapi/api/resources/dna.py @@ -23,7 +23,7 @@ from __future__ import annotations -from typing import Any, Union +from typing import Any from flask import abort from gramps.gen.const import GRAMPS_LOCALE as glocale @@ -34,8 +34,9 @@ from gramps.gen.utils.grampslocale import GrampsLocale from webargs import fields, validate +from gramps_webapi.api.dna import parse_raw_dna_match_string from gramps_webapi.api.people_families_cache import CachePeopleFamiliesProxy -from gramps_webapi.types import Handle, ResponseReturnValue +from gramps_webapi.types import Handle, MatchSegment, ResponseReturnValue from ...types import Handle from ..util import get_db_handle, get_locale_for_language, use_args @@ -46,8 +47,6 @@ SIDE_MATERNAL = "M" SIDE_PATERNAL = "P" -Segment = dict[str, Union[float, int, str]] - class PersonDnaMatchesResource(ProtectedResource): """Resource for getting DNA match data for a person.""" @@ -195,7 +194,7 @@ def get_match_data( def get_segments_from_note( db_handle: DbReadBase, handle: Handle, side: str | None = None -) -> list[Segment]: +) -> list[MatchSegment]: """Get the segements from a note handle.""" try: note: Note | None = db_handle.get_note_from_handle(handle) @@ -204,73 +203,23 @@ def get_segments_from_note( if note is None: return [] raw_string: str = note.get() - return parse_raw_dna_match_string(raw_string, side=side) + return parse_raw_match_string_with_default_side(raw_string, side=side) -def parse_raw_dna_match_string( +def parse_raw_match_string_with_default_side( raw_string: str, side: str | None = None -) -> list[Segment]: - """Parse a raw DNA match string and return a list of segments.""" +) -> list[MatchSegment]: + """Parse a raw DNA match string and return a list of segments. + + If the side is unknown, optionally set it to a default value. + """ + original_segments = parse_raw_dna_match_string(raw_string) + if side is None: + return original_segments segments = [] - for line in raw_string.split("\n"): - data = parse_line(line, side=side) - if data: - segments.append(data) - return segments - - -def parse_line(line: str, side: str | None = None) -> Segment | None: - """Parse a line from the CSV/TSV data and return a dictionary.""" - if "\t" in line: - # Tabs are the field separators. Now determine THOUSEP and RADIXCHAR. - # Use Field 2 (Stop Pos) to see if there are THOUSEP there. Use Field 3 - # (SNPs) to see if there is a radixchar - field = line.split("\t") - if "," in field[2]: - line = line.replace(",", "") - elif "." in field[2]: - line = line.replace(".", "") - if "," in field[3]: - line = line.replace(",", ".") - line = line.replace("\t", ",") - field = line.split(",") - if len(field) < 4: - return None - chromo = field[0].strip() - start = get_base(field[1]) - stop = get_base(field[2]) - try: - cms = float(field[3]) - except (ValueError, TypeError, IndexError): - return None - try: - snp = int(field[4]) - except (ValueError, TypeError, IndexError): - snp = 0 - seg_comment = "" - side = side or SIDE_UNKNOWN - if len(field) > 5: - if field[5] in {SIDE_MATERNAL, SIDE_PATERNAL, SIDE_UNKNOWN}: - side = field[5].strip() + for segment in original_segments: + if segment["side"] == SIDE_UNKNOWN: + segments.append(segment | {"side": side}) else: - seg_comment = field[5].strip() - return { - "chromosome": chromo, - "start": start, - "stop": stop, - "side": side, - "cM": cms, - "SNPs": snp, - "comment": seg_comment, - } - - -def get_base(num: str) -> int: - """Get the number as int.""" - try: - return int(num) - except (ValueError, TypeError): - try: - return int(float(num) * 1000000) - except (ValueError, TypeError): - return 0 + segments.append(segment) + return segments diff --git a/gramps_webapi/types.py b/gramps_webapi/types.py index d9bcf073..5eea4fcc 100644 --- a/gramps_webapi/types.py +++ b/gramps_webapi/types.py @@ -19,12 +19,16 @@ """Custom types.""" -import flask.typing +from __future__ import annotations + from pathlib import Path -from typing import Any, Dict, List, NewType, Union +from typing import Any, NewType, Union + +import flask.typing Handle = NewType("Handle", str) GrampsId = NewType("GrampsId", str) FilenameOrPath = Union[str, Path] -TransactionJson = List[Dict[str, Any]] +TransactionJson = list[dict[str, Any]] ResponseReturnValue = flask.typing.ResponseReturnValue +MatchSegment = dict[str, Union[float, int, str, None]] diff --git a/tests/test_dna_match_parser.py b/tests/test_dna_match_parser.py new file mode 100644 index 00000000..52a76237 --- /dev/null +++ b/tests/test_dna_match_parser.py @@ -0,0 +1,252 @@ +"""Test the DNA match parser.""" + +from gramps_webapi.api.dna import parse_raw_dna_match_string + + +def test_gramplet_form(): + """Test the format supported by the Gramplet.""" + string = """Chromosome,Start Location,End Location,Centimorgans,Matching SNPs,Name,Match Name +3,56950055,64247327,10.9,375,Luther Robinson, Robert F. Garner +11,25878681,35508918,9.9,396 +""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 2 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 375, + "comment": "Luther Robinson", + } + assert segments[1] == { + "chromosome": "11", + "start": 25878681, + "stop": 35508918, + "side": "U", + "cM": 9.9, + "SNPs": 396, + "comment": "", + } + + +def test_gramplet_form_with_tabs(): + """Test the format supported by the Gramplet with tabs.""" + string = """Chromosome Start Location End Location Centimorgans Matching SNPs Name Match Name +3 56950055 64247327 10.9 375 Luther Robinson Robert F. Garner +11 25878681 35508918 9.9 396 +""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 2 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 375, + "comment": "Luther Robinson", + } + assert segments[1] == { + "chromosome": "11", + "start": 25878681, + "stop": 35508918, + "side": "U", + "cM": 9.9, + "SNPs": 396, + "comment": "", + } + + +def test_gramplet_form_without_header(): + """Test the format supported by the Gramplet without header.""" + string = """3,56950055,64247327,10.9,375,Luther Robinson, Robert F. Garner +11,25878681,35508918,9.9,396 +""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 2 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 375, + "comment": "Luther Robinson", + } + assert segments[1] == { + "chromosome": "11", + "start": 25878681, + "stop": 35508918, + "side": "U", + "cM": 9.9, + "SNPs": 396, + "comment": "", + } + + +def test_myheritage_format(): + """Test the MyHeritage CSV format.""" + string = """Name,Match Name,Chromosome,Start Location,End Location,Start RSID,End RSID,Centimorgans,SNPs +John Doe,Jane Doe,10,11830498,29606974,rs7924203,rs11007524,27.7,11520 +John Doe,Jane Doe,10,50018165,82402437,rs2928402,rs4934387,36.6,17920""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 2 + assert segments[0] == { + "chromosome": "10", + "start": 11830498, + "stop": 29606974, + "side": "U", + "cM": 27.7, + "SNPs": 11520, + "comment": "John Doe", + } + assert segments[1] == { + "chromosome": "10", + "start": 50018165, + "stop": 82402437, + "side": "U", + "cM": 36.6, + "SNPs": 17920, + "comment": "John Doe", + } + + +def test_gedmatch_german_locale(): + """Test the Gedmatch CSV format with German locale.""" + string = """Chr B37 Start Pos'n B37 End Pos'n Centimorgans (cM) SNPs Segment threshold Bunch limit SNP Density Ratio +11 69.231.796 83.487.889 15,5 2.157 210 126 0,29 +11 130.347.190 133.862.526 11,1 977 204 122 0,34""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 2 + assert segments[0] == { + "chromosome": "11", + "start": 69231796, + "stop": 83487889, + "side": "U", + "cM": 15.5, + "SNPs": 2157, + "comment": "210", + } + assert segments[1] == { + "chromosome": "11", + "start": 130347190, + "stop": 133862526, + "side": "U", + "cM": 11.1, + "SNPs": 977, + "comment": "204", + } + + +def test_geneatnet_format(): + """Test the GeneaNet CSV format.""" + string = """Chromosome;Start of segment;Length of segment;Number of SNPs;Length in centimorgan (cM);Type of segment +9;14037831;73101159;6804;38.64;half-identical +""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "9", + "start": 14037831, + "stop": 73101159, + "side": "U", + "cM": 38.64, + "SNPs": 6804, + "comment": "half-identical", + } + + +def test_with_whitespace(): + """Test a wrong format.""" + string = """ + Chromosome, Start\t, End, \tcM + 3, 56950055, 64247327, 10.9""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 0, + "comment": "", + } + + +def test_three_columns(): + """Test a wrong format.""" + string = """Chromosome,Start Location,End Location +3,56950055,64247327""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 0 + + +def test_integer_with_separators(): + """Test a wrong format.""" + string = """Chromosome,Start Location,End Location,Centimorgans +3,56.950.055,64.247.327,10.9""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 0, + "comment": "", + } + + +def test_integer_with_separators_tab(): + """Test a wrong format.""" + string = """Chromosome\tStart Location\tEnd Location\tCentimorgans +3\t56,950,055\t64,247,327\t10.9""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 10.9, + "SNPs": 0, + "comment": "", + } + + +def test_non_castable_integer(): + """Test a wrong format.""" + string = """Chromosome,Start Location,End Location,Centimorgans +3,56950055,64247327a,10.9""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 0, + "side": "U", + "cM": 10.9, + "SNPs": 0, + "comment": "", + } + + +def test_non_castable_float(): + """Test a wrong format.""" + string = """Chromosome,Start Location,End Location,Centimorgans +3,56950055,64247327,10.9a""" + segments = parse_raw_dna_match_string(string) + assert len(segments) == 1 + assert segments[0] == { + "chromosome": "3", + "start": 56950055, + "stop": 64247327, + "side": "U", + "cM": 0, + "SNPs": 0, + "comment": "", + }