Skip to content

Commit

Permalink
feat: change varfish-annotator to mehari (#392) (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored May 5, 2023
1 parent 7759d4d commit c86c665
Show file tree
Hide file tree
Showing 12 changed files with 603 additions and 338 deletions.
70 changes: 35 additions & 35 deletions snappy_pipeline/workflows/varfish_export/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,67 +45,67 @@ rule varfish_export_write_pedigree_run:
wf.substep_dispatch("write_pedigree", "run", wildcards, output)


# Run varfish-annotator-cli annotate ------------------------------------------
# Run varfish-annotator-cli annotate-seqvars -----------------------------------


rule varfish_export_varfish_annotator_annotate:
rule varfish_export_mehari_annotate_seqvars:
input:
unpack(wf.get_input_files("varfish_annotator", "annotate")),
unpack(wf.get_input_files("mehari", "annotate_seqvars")),
output:
**wf.get_output_files("varfish_annotator", "annotate"),
threads: wf.get_resource("varfish_annotator", "annotate", "threads")
**wf.get_output_files("mehari", "annotate_seqvars"),
threads: wf.get_resource("mehari", "annotate_seqvars", "threads")
resources:
time=wf.get_resource("varfish_annotator", "annotate", "time"),
memory=wf.get_resource("varfish_annotator", "annotate", "memory"),
partition=wf.get_resource("varfish_annotator", "annotate", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "annotate", "tmpdir"),
time=wf.get_resource("mehari", "annotate_seqvars", "time"),
memory=wf.get_resource("mehari", "annotate_seqvars", "memory"),
partition=wf.get_resource("mehari", "annotate_seqvars", "partition"),
tmpdir=wf.get_resource("mehari", "annotate_seqvars", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "annotate"),
**wf.get_log_file("mehari", "annotate_seqvars"),
params:
**{"args": wf.get_params("varfish_annotator", "annotate")},
**{"args": wf.get_params("mehari", "annotate_seqvars")},
wrapper:
wf.wrapper_path("varfish_annotator/annotate")
wf.wrapper_path("mehari/annotate_seqvars")


# Run varfish-annotator-cli annotate-svs ---------------------------------------
# Run varfish-annotator-cli annotate-strucvars ---------------------------------


rule varfish_export_varfish_annotator_annotate_svs:
rule varfish_export_mehari_annotate_strucvars:
input:
unpack(wf.get_input_files("varfish_annotator", "annotate_svs")),
unpack(wf.get_input_files("mehari", "annotate_strucvars")),
output:
**wf.get_output_files("varfish_annotator", "annotate_svs"),
threads: wf.get_resource("varfish_annotator", "annotate_svs", "threads")
**wf.get_output_files("mehari", "annotate_strucvars"),
threads: wf.get_resource("mehari", "annotate_strucvars", "threads")
resources:
time=wf.get_resource("varfish_annotator", "annotate_svs", "time"),
memory=wf.get_resource("varfish_annotator", "annotate_svs", "memory"),
partition=wf.get_resource("varfish_annotator", "annotate_svs", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "annotate_svs", "tmpdir"),
time=wf.get_resource("mehari", "annotate_strucvars", "time"),
memory=wf.get_resource("mehari", "annotate_strucvars", "memory"),
partition=wf.get_resource("mehari", "annotate_strucvars", "partition"),
tmpdir=wf.get_resource("mehari", "annotate_strucvars", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "annotate_svs"),
**wf.get_log_file("mehari", "annotate_strucvars"),
params:
**{"args": wf.get_params("varfish_annotator", "annotate_svs")},
**{"args": wf.get_params("mehari", "annotate_strucvars")},
wrapper:
wf.wrapper_path("varfish_annotator/annotate_svs")
wf.wrapper_path("mehari/annotate_strucvars")


# Gather statistics about the alignment ---------------------------------------


rule varfish_export_varfish_annotator_bam_qc:
rule varfish_export_mehari_bam_qc:
input:
unpack(wf.get_input_files("varfish_annotator", "bam_qc")),
unpack(wf.get_input_files("mehari", "bam_qc")),
output:
**wf.get_output_files("varfish_annotator", "bam_qc"),
threads: wf.get_resource("varfish_annotator", "bam_qc", "threads")
**wf.get_output_files("mehari", "bam_qc"),
threads: wf.get_resource("mehari", "bam_qc", "threads")
resources:
time=wf.get_resource("varfish_annotator", "bam_qc", "time"),
memory=wf.get_resource("varfish_annotator", "bam_qc", "memory"),
partition=wf.get_resource("varfish_annotator", "bam_qc", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "bam_qc", "tmpdir"),
time=wf.get_resource("mehari", "bam_qc", "time"),
memory=wf.get_resource("mehari", "bam_qc", "memory"),
partition=wf.get_resource("mehari", "bam_qc", "partition"),
tmpdir=wf.get_resource("mehari", "bam_qc", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "bam_qc"),
**wf.get_log_file("mehari", "bam_qc"),
params:
**{"args": wf.get_params("varfish_annotator", "bam_qc")},
**{"args": wf.get_params("mehari", "bam_qc")},
wrapper:
wf.wrapper_path("varfish_annotator/bam_qc")
wf.wrapper_path("mehari/bam_qc")
72 changes: 33 additions & 39 deletions snappy_pipeline/workflows/varfish_export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,19 @@
release: GRCh37 # REQUIRED: default 'GRCh37'
# Path to BED file with exons; used for reducing data to near-exon small variants.
path_exon_bed: null # REQUIRED: exon BED file to use
# Path to Jannovar RefSeq ``.ser`` file for annotation
path_refseq_ser: REQUIRED # REQUIRED: path to RefSeq .ser file
# Path to Jannovar ENSEMBL ``.ser`` file for annotation
path_ensembl_ser: REQUIRED # REQUIRED: path to ENSEMBL .ser file
# Path to VarFish annotator database file to use for annotating.
path_db: REQUIRED # REQUIRED: spath to varfish-annotator DB file to use
# Path to mehari database.
path_mehari_db: REQUIRED # REQUIRED: path to mehari database
"""


class VarfishAnnotatorAnnotateStepPart(VariantCallingGetLogFileMixin, BaseStepPart):
"""This step part is responsible for annotating the variants with VarFish Annotator"""
class MehariStepPart(VariantCallingGetLogFileMixin, BaseStepPart):
"""This step part is responsible for annotating the variants with Mehari"""

name = "varfish_annotator"
actions = ("annotate", "annotate_svs", "bam_qc")
name = "mehari"
actions = ("annotate_seqvars", "annotate_strucvars", "bam_qc")

def __init__(self, parent):
super().__init__(parent)
self.base_path_out = (
"work/{mapper}.{var_caller}.varfish_annotated.{index_ngs_library}/out/.done"
)
# Build shortcut from index library name to pedigree
self.index_ngs_library_to_pedigree = {}
for sheet in self.parent.shortcut_sheets:
Expand All @@ -159,7 +152,7 @@ def get_log_file(self, action: str) -> SnakemakeDictItemsGenerator:
self._validate_action(action)
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/log/"
f"{{mapper}}.varfish_annotator_{action}.{{index_ngs_library}}"
f"{{mapper}}.mehari_{action}.{{index_ngs_library}}"
)
key_ext = (
("wrapper", ".wrapper.py"),
Expand Down Expand Up @@ -187,16 +180,16 @@ def get_resource_usage(self, action: str) -> ResourceUsage:
@listify
def get_result_files(self, action):
# Generate templates to the output paths from action's result files.
if action == "annotate":
raw_path_tpls = self._get_output_files_annotate().values()
elif action == "annotate_svs":
# Only annotate SVs if path to step for calling them is configured.
if action == "annotate_seqvars":
raw_path_tpls = self._get_output_files_annotate_seqvars().values()
elif action == "annotate_strucvars":
# Only annotate_seqvars SVs if path to step for calling them is configured.
if (
not self.parent.config["path_sv_calling_targeted"]
and not self.parent.config["path_sv_calling_wgs"]
):
return
raw_path_tpls = self._get_output_files_annotate_svs().values()
raw_path_tpls = self._get_output_files_annotate_strucvars().values()
elif action == "bam_qc":
raw_path_tpls = self._get_output_files_bam_qc().values()
# Filter the templates to the paths in the output directory.
Expand All @@ -205,7 +198,8 @@ def get_result_files(self, action):
# Create concrete paths for all pedigrees in the sample sheet.
index_ngs_libraries = self._get_index_ngs_libraries(
require_consistent_pedigree_kits=(
bool(self.parent.config["path_sv_calling_targeted"]) and (action == "annotate_svs")
bool(self.parent.config["path_sv_calling_targeted"])
and (action == "annotate_strucvars")
)
)
kwargs = {
Expand Down Expand Up @@ -249,7 +243,7 @@ def _is_pedigree_good(self, pedigree: Pedigree) -> bool:
return not msg

@dictify
def _get_input_files_annotate(self, wildcards):
def _get_input_files_annotate_seqvars(self, wildcards):
yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"

variant_calling = self.parent.sub_workflows["variant_calling"]
Expand All @@ -271,13 +265,13 @@ def _get_input_files_annotate(self, wildcards):
yield "vcf", vcfs

@dictify
def _get_output_files_annotate(self):
def _get_output_files_annotate_seqvars(self):
# Generate paths in "work/" directory
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_annotate.{index_ngs_library}"
"{mapper}.mehari_annotate_seqvars.{index_ngs_library}"
)
work_paths = { # annotate will write out PED file
work_paths = { # annotate_seqvars will write out PED file
"ped": f"{prefix}.ped",
"ped_md5": f"{prefix}.ped.md5",
"gts": f"{prefix}.gts.tsv.gz",
Expand All @@ -289,10 +283,12 @@ def _get_output_files_annotate(self):
# Generate paths in "output/" directory
yield "output_links", [
re.sub(r"^work/", "output/", work_path)
for work_path in chain(work_paths.values(), self.get_log_file("annotate").values())
for work_path in chain(
work_paths.values(), self.get_log_file("annotate_seqvars").values()
)
]

def _get_params_annotate(self, wildcards: Wildcards) -> typing.Dict[str, typing.Any]:
def _get_params_annotate_seqvars(self, wildcards: Wildcards) -> typing.Dict[str, typing.Any]:
pedigree = self.index_ngs_library_to_pedigree[wildcards.index_ngs_library]
for donor in pedigree.donors:
if (
Expand All @@ -303,7 +299,7 @@ def _get_params_annotate(self, wildcards: Wildcards) -> typing.Dict[str, typing.
return {"step_name": "varfish_export"}

@dictify
def _get_input_files_annotate_svs(self, wildcards):
def _get_input_files_annotate_strucvars(self, wildcards):
yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"

if self.parent.config["path_sv_calling_targeted"]:
Expand Down Expand Up @@ -387,28 +383,28 @@ def _get_input_files_annotate_svs(self, wildcards):
yield "vcf_cov", cov_vcfs

@dictify
def _get_output_files_annotate_svs(self):
def _get_output_files_annotate_strucvars(self):
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_annotate_svs.{index_ngs_library}"
"{mapper}.mehari_annotate_strucvars.{index_ngs_library}"
)
work_paths = {
"gts": f"{prefix}.gts.tsv.gz",
"gts_md5": f"{prefix}.gts.tsv.gz.md5",
"feature_effects": f"{prefix}.feature-effects.tsv.gz",
"feature_effects_md5": f"{prefix}.feature-effects.tsv.gz.md5",
"db_infos": f"{prefix}.db-infos.tsv.gz",
"db_infos_md5": f"{prefix}.db-infos.tsv.gz.md5",
}
yield from work_paths.items()
# Generate paths in "output/" directory
yield "output_links", [
re.sub(r"^work/", "output/", work_path)
for work_path in chain(work_paths.values(), self.get_log_file("annotate_svs").values())
for work_path in chain(
work_paths.values(), self.get_log_file("annotate_strucvars").values()
)
]

#: Alias the get params function.
_get_params_annotate_svs = _get_params_annotate
_get_params_annotate_strucvars = _get_params_annotate_seqvars

@dictify
def _get_input_files_bam_qc(self, wildcards):
Expand Down Expand Up @@ -439,7 +435,7 @@ def _get_input_files_bam_qc(self, wildcards):
def _get_output_files_bam_qc(self) -> SnakemakeDictItemsGenerator:
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_bam_qc.{index_ngs_library}"
"{mapper}.mehari_bam_qc.{index_ngs_library}"
)
work_paths = {
"bam_qc": f"{prefix}.bam-qc.tsv.gz",
Expand Down Expand Up @@ -502,9 +498,7 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir)
)

# Register sub step classes so the sub steps are available
self.register_sub_step_classes(
(WritePedigreeStepPart, VarfishAnnotatorAnnotateStepPart, LinkOutStepPart)
)
self.register_sub_step_classes((WritePedigreeStepPart, MehariStepPart, LinkOutStepPart))

# Register sub workflows
self.register_sub_workflow("variant_calling", self.config["path_variant_calling"])
Expand Down Expand Up @@ -564,8 +558,8 @@ def get_result_files(self):
We will process all primary DNA libraries and perform joint calling within pedigrees
"""
for action in self.sub_steps["varfish_annotator"].actions:
yield from self.sub_steps["varfish_annotator"].get_result_files(action)
for action in self.sub_steps["mehari"].actions:
yield from self.sub_steps["mehari"].get_result_files(action)

def check_config(self):
self.ensure_w_config(
Expand Down
Loading

0 comments on commit c86c665

Please sign in to comment.