Skip to content

Commit

Permalink
feat(diplotype-comparator): find equivalent phased alleles
Browse files Browse the repository at this point in the history
  • Loading branch information
BinglanLi committed May 7, 2024
1 parent 79fb6ea commit a8a786c
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 111 deletions.
17 changes: 16 additions & 1 deletion src/scripts/diplotype_comparison/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,26 @@ The default output is a file also named `predicted_pharmcat_calls.tsv`. The file


### 2. with missing positions

Find possible alternative calls for every possible combination of missing positions. Note, to spped up the tool, the script only focuses on positions whose missing will lead to ambiguous calls.
```shell
python3 compare_diplotype_definition.py -m
```

Use the following command if you want to find alternative calls when specific positions are missing. At the moment, the script takes a VCF file and regards all positions in the VCF missing.
```shell
python3 compare_diplotype_definition.py -m -M missing_position.vcf
```

### 3. for phased data

Find alternative calls for phased data. This is specifically designed for phased data that have missing positions. Use `-M <missing_vcf>` if you want to specify the missing positions.

```shell
python3 compare_diplotype_definition.py -p
```



### 3. filter failed tests

Run the following code to pass autogenerated tests. Note that you need to update `test_file_pattern` to denote what autogenerated tests you are evaluating.
Expand Down
130 changes: 95 additions & 35 deletions src/scripts/diplotype_comparison/compare_diplotype_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from glob import glob
from pathlib import Path
from timeit import default_timer as timer

import numpy as np
import pandas as pd

Expand All @@ -23,11 +22,17 @@
type=str,
required=False,
help="File name or the directory to the PharmCAT JSONs of allele definitions.")
# input arguments
parser.add_argument("-m", "--missing",
action='store_true',
required=False,
help="To evaluate diplotypes with missing positions, True or False.")
help="(Optional) to evaluate diplotypes with missing positions, True or False.")
parser.add_argument("-M", "--missing-position-file",
required=False,
help="(Optional) file of missing positions in VCF format.")
parser.add_argument("-p", "--phased",
action='store_true',
required=False,
help="(Optional) whether input is phased, True or False. If phased, only compare alleles.")

# output args
parser.add_argument("-o", "--output-dir",
Expand All @@ -50,13 +55,37 @@
args = parser.parse_args()

try:
# define whether to process as phased
m_phased: bool = args.phased
# define whether to evaluate missing positions
m_missing: bool = args.missing
# define whether to produce results for clinical calls
m_clinical_calls: bool = args.clinical_calls
# checks for boolean parameters
if m_phased and not m_missing:
print(f'\'--phased\' must work with \'--missing\' or \'--missing-position-file\'')
sys.exit(1)

# define output file name
m_output_basename: str = args.base_filename
# define output file path
m_output_dir: Path = Path(args.output_dir) if args.output_dir else Path.cwd()
if not args.output_dir:
print(f'Output directory is not specified.\n'
f'\tUse the current working directory: {m_output_dir}')
# define output file name
if m_missing or args.missing_position_file:
m_output_file: Path = m_output_dir / (m_output_basename + '_missing.tsv')
else:
m_output_file: Path = m_output_dir / (m_output_basename + '.tsv')
# delete file
m_output_file.unlink(missing_ok=True)
print()
if m_clinical_calls:
print(f'Generating clinical call data')
if args.base_filename == 'predicted_pharmcat_calls':
m_output_file: Path = m_output_dir / 'clinical_calls_diff.tsv'
print(f'Saving to {m_output_file}\n')

# define input
m_input: Path
Expand Down Expand Up @@ -84,18 +113,27 @@
print(f'No allele definition JSON files are found under {m_input}')
sys.exit(1)

if args.missing:
m_output_file: Path = m_output_dir / (m_output_basename + '_missing.tsv')
else:
m_output_file: Path = m_output_dir / (m_output_basename + '.tsv')
# delete file
m_output_file.unlink(missing_ok=True)
print()
if args.clinical_calls:
print(f'Generating clinical call data')
if args.base_filename == 'predicted_pharmcat_calls':
m_output_file: Path = m_output_dir / 'clinical_calls_diff.tsv'
print(f'Saving to {m_output_file}\n')
# read the VCF file of missing positions if provides
provided_missing_positions: list[str] = []
if args.missing_position_file:
m_missing_position_file: Path = Path(args.missing_position_file).resolve()
if m_missing_position_file.is_file():
with open(m_missing_position_file, 'r') as mf:
for line in mf:
if line[0] == '#':
continue
else:
line = line.rstrip('\n')
fields = line.split('\t')
missing_position: str = fields[0] + '_' + str(fields[1])
if missing_position not in provided_missing_positions:
provided_missing_positions.append(missing_position)
else:
print(f'Cannot find the VCF of missing positions at {m_missing_position_file}')
# warn and exit if the VCF has no entries
if len(provided_missing_positions) == 0:
print(f'There is no entries of genomic positions in {m_missing_position_file}')
sys.exit(1)

# process each gene
for json_file in allele_definition_jsons:
Expand All @@ -105,7 +143,7 @@
# get the gene name
gene: str = filename.split('_')[0]

if args.clinical_calls and (gene == 'CYP2D6' or gene == 'DPYD'):
if m_clinical_calls and (gene == 'CYP2D6' or gene == 'DPYD'):
continue

# read files
Expand Down Expand Up @@ -139,49 +177,71 @@
print(f'\tSkipping - no alleles share definitions with others')
continue

# find all possible outcomes of alternative calls for each diplotype
# find all possible alternative calls
dict_predicted_calls = dict()
if args.missing:
if m_missing:
# skip certain genes
if gene == 'CYP2D6':
print(f'\tSkipping - too complex')
continue

if gene == 'RYR1':
elif gene == 'RYR1':
print(f'\tSkipping - not necessary')
continue

# identify the combinations of missing positions to evaluate
hgvs_names = [*allele_defining_variants]
missing_combinations = util.find_missingness_combinations(allele_position_array, list(allele_names),
hgvs_names, reference_alleles,
gregarious_alleles)
# find all combinations of missing positions
missing_combinations: list[list[str]] = []
if len(provided_missing_positions):
# get the hgvs names of the missing position
missing_combination: list[str] = []
for hgvs_name, dict_genomic_position in allele_defining_variants.items():
chr_pos_name: str = (dict_genomic_position['chromosome'] + '_' +
str(dict_genomic_position['position']))
if chr_pos_name in provided_missing_positions:
missing_combination.append(hgvs_name)
# add the VCF missing positions to the list
missing_combinations.append(missing_combination)
elif m_missing:
# skip certain genes
if gene in ['CYP2D6', 'RYR1']:
continue
# identify the combinations of missing positions to evaluate
hgvs_names = [*allele_defining_variants]
missing_combinations = util.find_missingness_combinations(
allele_position_array, list(allele_names), hgvs_names, reference_alleles, gregarious_alleles)

# if the gene does not have positions whose absence will cause ambiguous pharmcat calls, then skip
if not missing_combinations:
# find alternative calls when certain genomic positions are missing
if len(missing_combinations) == 0:
# if the gene does not have positions whose absence will cause ambiguous pharmcat calls, then skip
print(f'\tSkipping - no ambiguous calls')
continue
else:
print(f'\tNumber of combinations = {len(missing_combinations)}')
for m in missing_combinations:
# find possible calls for each missing position
dict_predicted_calls_m = util.predict_pharmcat_calls(allele_defining_variants,
allele_definitions, gregarious_alleles, m)

dict_predicted_calls_m = util.predict_pharmcat_calls(
dict_allele_defining_variants=allele_defining_variants,
dict_allele_definitions=allele_definitions,
alleles_to_test=gregarious_alleles,
missing_positions=m,
phased=m_phased)
# append to dict_predicted_calls
for key in dict_predicted_calls_m:
dict_predicted_calls[key] = dict_predicted_calls.get(key, []) + dict_predicted_calls_m[key]
dict_predicted_calls[key] = (dict_predicted_calls.get(key, []) +
dict_predicted_calls_m[key])
else:
dict_predicted_calls = util.predict_pharmcat_calls(allele_defining_variants,
allele_definitions,
gregarious_alleles)
dict_predicted_calls = util.predict_pharmcat_calls(
dict_allele_defining_variants=allele_defining_variants,
dict_allele_definitions=allele_definitions,
alleles_to_test=gregarious_alleles,
phased=m_phased)

# convert the python dictionary to a pandas data frame for output
rez = pd.DataFrame.from_dict(dict_predicted_calls)
# add gene name to the data frame for readability in the output
rez['gene'] = gene
# write to an output
if len(rez):
if args.clinical_calls:
if m_clinical_calls:
# keep only diplotypes that have multiple top-scored calls
mask = rez[rez['actual'].str.contains(';')]
# remove duplicates
Expand Down
Loading

0 comments on commit a8a786c

Please sign in to comment.