Skip to content

Commit

Permalink
(SV) re-interpreting CPX records by experimental interpretation tool (#…
Browse files Browse the repository at this point in the history
…4602)

* (SV) feature commit:
 adds a new tested tool named CpxVariantReInterprepterSpark
    to extract barebone-annotated simple variants from
    an GATK-SV discovery pipeline produced VCF containing complex variants
 it is hooked up to SvDiscoverFromLocalAssemblyContigAlignmentsSpark
    (hence StructuralVariationDiscoveryPipelineSpark as well)
also 
  * refactor test utils and data provider for sv discovery subpackage
  * make changes in StructuralVariationDiscoveryPipelineSparkIntegrationTest to accomodate later integration tests
  • Loading branch information
SHuang-Broad authored May 24, 2018
1 parent c8f53cd commit f005644
Show file tree
Hide file tree
Showing 38 changed files with 5,970 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ protected void runTool( final JavaSparkContext ctx ) {

final List<VariantContext> annotatedVariants = processEvidenceTargetLinks(assemblyBasedVariants, svDiscoveryInputMetaData);

final String outputPath = svDiscoveryInputMetaData.outputPath;
final SAMSequenceDictionary refSeqDictionary = svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast.getValue();
final Logger toolLogger = svDiscoveryInputMetaData.toolLogger;
final String outputPath = svDiscoveryInputMetaData.getOutputPath();
final SAMSequenceDictionary refSeqDictionary = svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue();
final Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();
SVVCFWriter.writeVCF(annotatedVariants, outputPath + "inv_del_ins.vcf", refSeqDictionary, toolLogger);

// TODO: 1/14/18 this is the next version of precise variant calling
Expand Down Expand Up @@ -232,12 +232,12 @@ private static List<VariantContext> processEvidenceTargetLinks(List<VariantConte
final SvDiscoveryInputMetaData svDiscoveryInputMetaData) {

final List<VariantContext> annotatedVariants;
if (svDiscoveryInputMetaData.sampleSpecificData.evidenceTargetLinks != null) {
final PairedStrandedIntervalTree<EvidenceTargetLink> evidenceTargetLinks = svDiscoveryInputMetaData.sampleSpecificData.evidenceTargetLinks;
final ReadMetadata readMetadata = svDiscoveryInputMetaData.sampleSpecificData.readMetadata;
final ReferenceMultiSource reference = svDiscoveryInputMetaData.referenceData.referenceBroadcast.getValue();
final DiscoverVariantsFromContigsAlignmentsSparkArgumentCollection discoverStageArgs = svDiscoveryInputMetaData.discoverStageArgs;
final Logger toolLogger = svDiscoveryInputMetaData.toolLogger;
if (svDiscoveryInputMetaData.getSampleSpecificData().getEvidenceTargetLinks() != null) {
final PairedStrandedIntervalTree<EvidenceTargetLink> evidenceTargetLinks = svDiscoveryInputMetaData.getSampleSpecificData().getEvidenceTargetLinks();
final ReadMetadata readMetadata = svDiscoveryInputMetaData.getSampleSpecificData().getReadMetadata();
final ReferenceMultiSource reference = svDiscoveryInputMetaData.getReferenceData().getReferenceBroadcast().getValue();
final DiscoverVariantsFromContigsAlignmentsSparkArgumentCollection discoverStageArgs = svDiscoveryInputMetaData.getDiscoverStageArgs();
final Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();

// annotate with evidence links
annotatedVariants = AnnotatedVariantProducer.
Expand Down Expand Up @@ -265,22 +265,22 @@ private static void experimentalInterpretation(final JavaSparkContext ctx,

final JavaRDD<GATKRead> assemblyRawAlignments = getContigRawAlignments(ctx, assembledEvidenceResults, svDiscoveryInputMetaData);

final String updatedOutputPath = svDiscoveryInputMetaData.outputPath + "experimentalInterpretation_";
final String updatedOutputPath = svDiscoveryInputMetaData.getOutputPath() + "experimentalInterpretation_";
svDiscoveryInputMetaData.updateOutputPath(updatedOutputPath);

SvDiscoverFromLocalAssemblyContigAlignmentsSpark.AssemblyContigsClassifiedByAlignmentSignatures contigsByPossibleRawTypes
= SvDiscoverFromLocalAssemblyContigAlignmentsSpark.preprocess(svDiscoveryInputMetaData, assemblyRawAlignments);

SvDiscoverFromLocalAssemblyContigAlignmentsSpark.dispatchJobs(contigsByPossibleRawTypes, svDiscoveryInputMetaData,
assemblyRawAlignments, true);
SvDiscoverFromLocalAssemblyContigAlignmentsSpark.dispatchJobs(ctx, contigsByPossibleRawTypes,
svDiscoveryInputMetaData, assemblyRawAlignments, true);
}

private static JavaRDD<GATKRead> getContigRawAlignments(final JavaSparkContext ctx,
final FindBreakpointEvidenceSpark.AssembledEvidenceResults assembledEvidenceResults,
final SvDiscoveryInputMetaData svDiscoveryInputMetaData) {
final Broadcast<SAMSequenceDictionary> referenceSequenceDictionaryBroadcast =
svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast;
final Broadcast<SAMFileHeader> headerBroadcast = svDiscoveryInputMetaData.sampleSpecificData.headerBroadcast;
svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast();
final Broadcast<SAMFileHeader> headerBroadcast = svDiscoveryInputMetaData.getSampleSpecificData().getHeaderBroadcast();
final SAMFileHeader headerForReads = headerBroadcast.getValue();
final SAMReadGroupRecord contigAlignmentsReadGroup = new SAMReadGroupRecord(SVUtils.GATKSV_CONTIG_ALIGNMENTS_READ_GROUP_ID);
final List<String> refNames = SequenceDictionaryUtils.getContigNamesList(referenceSequenceDictionaryBroadcast.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ protected void runTool(final JavaSparkContext ctx) {
final JavaRDD<AlignedContig> parsedContigAlignments =
new SvDiscoverFromLocalAssemblyContigAlignmentsSpark
.SAMFormattedContigAlignmentParser(getReads(),
svDiscoveryInputMetaData.sampleSpecificData.headerBroadcast.getValue(), true)
svDiscoveryInputMetaData.getSampleSpecificData().getHeaderBroadcast().getValue(), true)
.getAlignedContigs();

// assembly-based breakpoints
List<VariantContext> annotatedVariants = discoverVariantsFromChimeras(svDiscoveryInputMetaData, parsedContigAlignments);

final SAMSequenceDictionary refSeqDictionary = svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast.getValue();
final SAMSequenceDictionary refSeqDictionary = svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue();
SVVCFWriter.writeVCF(annotatedVariants, vcfOutputFile, refSeqDictionary, localLogger);
}

Expand All @@ -162,7 +162,7 @@ public static List<VariantContext> discoverVariantsFromChimeras(final SvDiscover
final JavaRDD<AlignedContig> alignedContigs) {

final Broadcast<SAMSequenceDictionary> referenceSequenceDictionaryBroadcast =
svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast;
svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast();

final JavaPairRDD<byte[], List<SimpleChimera>> contigSeqAndChimeras =
alignedContigs
Expand All @@ -175,12 +175,12 @@ public static List<VariantContext> discoverVariantsFromChimeras(final SvDiscover
return new Tuple2<>(alignedContig.getContigSequence(), chimeras);
});

final Broadcast<ReferenceMultiSource> referenceBroadcast = svDiscoveryInputMetaData.referenceData.referenceBroadcast;
final List<SVInterval> assembledIntervals = svDiscoveryInputMetaData.sampleSpecificData.assembledIntervals;
final Broadcast<SVIntervalTree<VariantContext>> cnvCallsBroadcast = svDiscoveryInputMetaData.sampleSpecificData.cnvCallsBroadcast;
final String sampleId = svDiscoveryInputMetaData.sampleSpecificData.sampleId;
final DiscoverVariantsFromContigsAlignmentsSparkArgumentCollection discoverStageArgs = svDiscoveryInputMetaData.discoverStageArgs;
final Logger toolLogger = svDiscoveryInputMetaData.toolLogger;
final Broadcast<ReferenceMultiSource> referenceBroadcast = svDiscoveryInputMetaData.getReferenceData().getReferenceBroadcast();
final List<SVInterval> assembledIntervals = svDiscoveryInputMetaData.getSampleSpecificData().getAssembledIntervals();
final Broadcast<SVIntervalTree<VariantContext>> cnvCallsBroadcast = svDiscoveryInputMetaData.getSampleSpecificData().getCnvCallsBroadcast();
final String sampleId = svDiscoveryInputMetaData.getSampleSpecificData().getSampleId();
final DiscoverVariantsFromContigsAlignmentsSparkArgumentCollection discoverStageArgs = svDiscoveryInputMetaData.getDiscoverStageArgs();
final Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();

final JavaPairRDD<NovelAdjacencyAndAltHaplotype, Iterable<SimpleChimera>> narlsAndSources =
contigSeqAndChimeras
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.broadinstitute.hellbender.tools.spark.sv.discovery;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.samtools.CigarOperator;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMFileWriter;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.*;
import htsjdk.samtools.util.SequenceUtil;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.logging.log4j.LogManager;
Expand All @@ -26,6 +23,7 @@
import org.broadinstitute.hellbender.tools.spark.sv.StructuralVariationDiscoveryPipelineSpark;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.alignment.*;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.inference.CpxVariantInterpreter;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.inference.SegmentedCpxVariantSimpleVariantExtractor;
import org.broadinstitute.hellbender.tools.spark.sv.discovery.inference.SimpleNovelAdjacencyInterpreter;
import org.broadinstitute.hellbender.tools.spark.sv.utils.SVFileUtils;
import org.broadinstitute.hellbender.tools.spark.sv.utils.SVIntervalTree;
Expand Down Expand Up @@ -151,22 +149,34 @@ protected void runTool(final JavaSparkContext ctx) {
final AssemblyContigsClassifiedByAlignmentSignatures contigsByPossibleRawTypes =
preprocess(svDiscoveryInputMetaData, assemblyRawAlignments);

dispatchJobs(contigsByPossibleRawTypes, svDiscoveryInputMetaData, assemblyRawAlignments, writeSAMFiles);
dispatchJobs(ctx, contigsByPossibleRawTypes, svDiscoveryInputMetaData, assemblyRawAlignments, writeSAMFiles);
}

//==================================================================================================================

public static final class AssemblyContigsClassifiedByAlignmentSignatures {
final JavaRDD<AssemblyContigWithFineTunedAlignments> unknown;
final JavaRDD<AssemblyContigWithFineTunedAlignments> simple;
final JavaRDD<AssemblyContigWithFineTunedAlignments> complex;
private final JavaRDD<AssemblyContigWithFineTunedAlignments> unknown;
private final JavaRDD<AssemblyContigWithFineTunedAlignments> simple;
private final JavaRDD<AssemblyContigWithFineTunedAlignments> complex;

private AssemblyContigsClassifiedByAlignmentSignatures(final JavaRDD<AssemblyContigWithFineTunedAlignments> contigs) {
unknown = contigs.filter(tig -> tig.getAlignmentSignatureBasicType().equals(UNKNOWN)).cache();
simple = contigs.filter(tig -> tig.getAlignmentSignatureBasicType().equals(SIMPLE_CHIMERA)).cache();
complex = contigs.filter(tig -> tig.getAlignmentSignatureBasicType().equals(COMPLEX)).cache();
}

public JavaRDD<AssemblyContigWithFineTunedAlignments> getContigsWithSignatureClassifiedAsUnknown() {
return unknown;
}

public JavaRDD<AssemblyContigWithFineTunedAlignments> getContigsWithSignatureClassifiedAsSimpleChimera() {
return simple;
}

public JavaRDD<AssemblyContigWithFineTunedAlignments> getContigsWithSignatureClassifiedAsComplex() {
return complex;
}

/**
* Write SAM file, if requested, for original alignments of contigs recognized as "Ambiguous", "Incomplete", and "MisAssemblySuspect"
* TODO: 11/17/17 salvation on assembly contigs that 1) has ambiguous "best" configuration, and 2) has incomplete picture; and flag accordingly
Expand Down Expand Up @@ -206,9 +216,9 @@ private void writeSAMfilesForUnknown(final String outputPrefix, final JavaRDD<GA
public static AssemblyContigsClassifiedByAlignmentSignatures preprocess(final SvDiscoveryInputMetaData svDiscoveryInputMetaData,
final JavaRDD<GATKRead> assemblyRawAlignments) {

final Broadcast<SAMFileHeader> headerBroadcast = svDiscoveryInputMetaData.sampleSpecificData.headerBroadcast;
final Broadcast<Set<String>> canonicalChromosomesBroadcast = svDiscoveryInputMetaData.referenceData.canonicalChromosomesBroadcast;
final Logger toolLogger = svDiscoveryInputMetaData.toolLogger;
final Broadcast<SAMFileHeader> headerBroadcast = svDiscoveryInputMetaData.getSampleSpecificData().getHeaderBroadcast();
final Broadcast<Set<String>> canonicalChromosomesBroadcast = svDiscoveryInputMetaData.getReferenceData().getCanonicalChromosomesBroadcast();
final Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();

final JavaRDD<AssemblyContigWithFineTunedAlignments> contigsWithChimericAlignmentsReconstructed =
AssemblyContigAlignmentsConfigPicker
Expand All @@ -231,33 +241,44 @@ public static AssemblyContigsClassifiedByAlignmentSignatures preprocess(final Sv
* {@link AssemblyContigWithFineTunedAlignments.AlignmentSignatureBasicType#UNKNOWN}
* currently DO NOT generate any VCF yet.
*/
public static void dispatchJobs(final AssemblyContigsClassifiedByAlignmentSignatures contigsByPossibleRawTypes,
public static void dispatchJobs(final JavaSparkContext ctx,
final AssemblyContigsClassifiedByAlignmentSignatures contigsByPossibleRawTypes,
final SvDiscoveryInputMetaData svDiscoveryInputMetaData,
final JavaRDD<GATKRead> assemblyRawAlignments,
final boolean writeSAMFiles) {

final String outputPrefixWithSampleName = svDiscoveryInputMetaData.outputPath;
final String outputPrefixWithSampleName = svDiscoveryInputMetaData.getOutputPath();

// TODO: 1/10/18 bring back read annotation, see ticket 4228

final List<VariantContext> simpleVariants =
SimpleNovelAdjacencyInterpreter.makeInterpretation(contigsByPossibleRawTypes.simple, svDiscoveryInputMetaData);
contigsByPossibleRawTypes.simple.unpersist();
SVVCFWriter.writeVCF(simpleVariants, outputPrefixWithSampleName + "NonComplex.vcf",
svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast.getValue(),
svDiscoveryInputMetaData.toolLogger);
svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue(),
svDiscoveryInputMetaData.getToolLogger());

final List<VariantContext> complexVariants =
CpxVariantInterpreter.makeInterpretation(contigsByPossibleRawTypes.complex, svDiscoveryInputMetaData);
contigsByPossibleRawTypes.complex.unpersist();
SVVCFWriter.writeVCF(complexVariants, outputPrefixWithSampleName + "Complex.vcf",
svDiscoveryInputMetaData.referenceData.referenceSequenceDictionaryBroadcast.getValue(),
svDiscoveryInputMetaData.toolLogger);
svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue(),
svDiscoveryInputMetaData.getToolLogger());

if (writeSAMFiles) {
contigsByPossibleRawTypes.writeSAMfilesForUnknown(outputPrefixWithSampleName, assemblyRawAlignments,
svDiscoveryInputMetaData.sampleSpecificData.headerBroadcast.getValue());
svDiscoveryInputMetaData.getSampleSpecificData().getHeaderBroadcast().getValue());
}

final JavaRDD<VariantContext> complexVariantsRDD = ctx.parallelize(complexVariants);
final SegmentedCpxVariantSimpleVariantExtractor.ExtractedSimpleVariants reInterpretedSimple =
SegmentedCpxVariantSimpleVariantExtractor.extract(complexVariantsRDD, svDiscoveryInputMetaData, assemblyRawAlignments);
final SAMSequenceDictionary refSeqDict = svDiscoveryInputMetaData.getReferenceData().getReferenceSequenceDictionaryBroadcast().getValue();
final Logger toolLogger = svDiscoveryInputMetaData.getToolLogger();
final String derivedOneSegmentSimpleVCF = outputPrefixWithSampleName + "cpx_reinterpreted_simple_1_seg.vcf";
final String derivedMultiSegmentSimpleVCF = outputPrefixWithSampleName + "cpx_reinterpreted_simple_multi_seg.vcf";
SVVCFWriter.writeVCF(reInterpretedSimple.getReInterpretZeroOrOneSegmentCalls(), derivedOneSegmentSimpleVCF, refSeqDict, toolLogger);
SVVCFWriter.writeVCF(reInterpretedSimple.getReInterpretMultiSegmentsCalls(), derivedMultiSegmentSimpleVCF, refSeqDict, toolLogger);
}

//==================================================================================================================
Expand Down
Loading

0 comments on commit f005644

Please sign in to comment.