Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resurrect VCF conversion code #147

Merged
merged 5 commits into from
Feb 28, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,119 @@
package edu.berkeley.cs.amplab.adam.converters

import edu.berkeley.cs.amplab.adam.avro._
import edu.berkeley.cs.amplab.adam.models.{ADAMVariantContext, ReferencePosition}
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._
import java.lang.NumberFormatException
import java.util
import org.apache.spark.Logging
import org.broadinstitute.variant.variantcontext.{Allele, Genotype, VariantContext}
import edu.berkeley.cs.amplab.adam.models.{ADAMVariantContext, SequenceDictionary, ReferencePosition}
import edu.berkeley.cs.amplab.adam.util.VcfStringUtils._
import org.broadinstitute.variant.vcf._
import org.broadinstitute.variant.variantcontext._
import org.broadinstitute.variant.vcf.VCFConstants
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

object VariantContextConverter {
private def attrAsInt(attr: Object):Object = attr match {
case a: String => java.lang.Integer.valueOf(a)
case a: java.lang.Integer => a
case a: java.lang.Number => java.lang.Integer.valueOf(a.intValue)
}
private def attrAsLong(attr: Object):Object = attr match {
case a: String => java.lang.Integer.valueOf(a)
case a: java.lang.Long => a
case a: java.lang.Number => java.lang.Long.valueOf(a.longValue)
}
private def attrAsFloat(attr: Object):Object = attr match {
case a: String => java.lang.Float.valueOf(a)
case a: java.lang.Float => a
case a: java.lang.Number => java.lang.Float.valueOf(a.floatValue)
}
private def attrAsString(attr: Object):Object = attr match {
case a: String => a
}
private def attrAsBoolean(attr: Object):Object = attr match {
case a: java.lang.Boolean => a
case a: String => java.lang.Boolean.valueOf(a)
}


private case class AttrKey(adamKey: String, attrConverter: (Object => Object), hdrLine: VCFCompoundHeaderLine) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I like this approach.

val vcfKey: String = hdrLine.getID
}

private val INFO_KEYS: Seq[AttrKey] = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to do this as a (separate) enum, perhaps with a specialized Value class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this for a bit, and I think it's possible using case classes but wasn't sure if there was a way to iterate through all case class instances. Alternatively we could rewrite this class in JAVA since it's essentially a conversion from the GATK Java classes to Java classes generated from AVRO.

AttrKey("clippingRankSum", attrAsFloat _, new VCFInfoHeaderLine("ClippingRankSum", 1, VCFHeaderLineType.Float, "Z-score From Wilcoxon rank sum test of Alt vs. Ref number of hard clipped bases")),
AttrKey("readDepth", attrAsInt _, VCFStandardHeaderLines.getInfoLine(VCFConstants.DEPTH_KEY)),
AttrKey("downsampled", attrAsBoolean _, VCFStandardHeaderLines.getInfoLine(VCFConstants.DOWNSAMPLED_KEY)),
AttrKey("fisherStrandBiasPValue", attrAsFloat _, VCFStandardHeaderLines.getInfoLine(VCFConstants.STRAND_BIAS_KEY)),
AttrKey("haplotypeScore", attrAsFloat _, new VCFInfoHeaderLine("HaplotypeScore", 1, VCFHeaderLineType.Float, "Consistency of the site with at most two segregating haplotypes")),
AttrKey("inbreedingCoefficient", attrAsFloat _, new VCFInfoHeaderLine("InbreedingCoeff", 1, VCFHeaderLineType.Float, "Inbreeding coefficient as estimated from the genotype likelihoods per-sample when compared against the Hardy-Weinberg expectation")),
AttrKey("rmsMapQ", attrAsFloat _, VCFStandardHeaderLines.getInfoLine(VCFConstants.RMS_MAPPING_QUALITY_KEY)),
AttrKey("mapq0Reads", attrAsInt _, VCFStandardHeaderLines.getInfoLine(VCFConstants.MAPPING_QUALITY_ZERO_KEY)),
AttrKey("mqRankSum", attrAsFloat _, new VCFInfoHeaderLine("MQRankSum", 1, VCFHeaderLineType.Float, "Z-score From Wilcoxon rank sum test of Alt vs. Ref read mapping qualities")),
AttrKey("usedForNegativeTrainingSet", attrAsBoolean _, new VCFInfoHeaderLine("NEGATIVE_TRAIN_SITE", 1, VCFHeaderLineType.Flag, "This variant was used to build the negative training set of bad variants")),
AttrKey("usedForPositiveTrainingSet", attrAsBoolean _, new VCFInfoHeaderLine("POSITIVE_TRAIN_SITE", 1, VCFHeaderLineType.Flag, "This variant was used to build the positive training set of good variants")),
AttrKey("variantQualityByDepth", attrAsFloat _, new VCFInfoHeaderLine("QD", 1, VCFHeaderLineType.Float, "Variant Confidence/Quality by Depth")),
AttrKey("readPositionRankSum", attrAsFloat _, new VCFInfoHeaderLine("ReadPosRankSum", 1, VCFHeaderLineType.Float, "Z-score from Wilcoxon rank sum test of Alt vs. Ref read position bias")),
AttrKey("vqslod", attrAsFloat _, new VCFInfoHeaderLine("VQSLOD", 1, VCFHeaderLineType.Float, "Log odds ratio of being a true variant versus being false under the trained gaussian mixture model")),
AttrKey("culprit", attrAsString _, new VCFInfoHeaderLine("culprit", 1, VCFHeaderLineType.String, "The annotation which was the worst performing in the Gaussian mixture model, likely the reason why the variant was filtered out"))
)

private val FORMAT_KEYS: Seq[AttrKey] = Seq(
AttrKey("alleles", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.GENOTYPE_KEY)),
AttrKey("gtQuality", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.GENOTYPE_QUALITY_KEY)),
AttrKey("readDepth", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.DEPTH_KEY)),
AttrKey("alleleDepths", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.GENOTYPE_ALLELE_DEPTHS)),
AttrKey("gtFilters", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.GENOTYPE_FILTER_KEY)),
AttrKey("genotypeLikelihoods", null, VCFStandardHeaderLines.getFormatLine(VCFConstants.GENOTYPE_PL_KEY)),
AttrKey("phaseQuality", attrAsInt _, new VCFFormatHeaderLine(VCFConstants.PHASE_QUALITY_KEY, 1, VCFHeaderLineType.Float, "Read-backed phasing quality")),
AttrKey("phaseSetId", attrAsInt _, new VCFFormatHeaderLine(VCFConstants.PHASE_SET_KEY, 1, VCFHeaderLineType.Integer, "Phase set"))
)

lazy val infoHeaderLines : Seq[VCFCompoundHeaderLine] = INFO_KEYS.map(_.hdrLine)
lazy val formatHeaderLines : Seq[VCFCompoundHeaderLine] = FORMAT_KEYS.map(_.hdrLine)

lazy val VCF2VarCallAnnos : Map[String,(Int,Object => Object)] = INFO_KEYS.map(field => {
var avro_field = VariantCallingAnnotations.getClassSchema.getField(field.adamKey)
field.vcfKey -> (avro_field.pos, field.attrConverter)
})(collection.breakOut)

lazy val VCF2GTAnnos : Map[String,(Int,Object => Object)] = FORMAT_KEYS.filter(_.attrConverter != null).map(field => {
var avro_field = ADAMGenotype.getClassSchema.getField(field.adamKey)
field.vcfKey -> (avro_field.pos, field.attrConverter)
})(collection.breakOut)

private def convertAttributes(vc: VariantContext, call : VariantCallingAnnotations): VariantCallingAnnotations = {
for ((v,a) <- VCF2VarCallAnnos) {
val attr = vc.getAttribute(v)
if (attr != null && attr != VCFConstants.MISSING_VALUE_v4) {
call.put(a._1, a._2(attr))
}
}
call
}

// One conversion method for each way of representing an Allele that
// GATK has, plus to convert based on actual alleles vs. the calls
// (ref/alt/nocall) in the genotypes.
private def convertAllele(allele: Allele): ADAMGenotypeAllele = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know why the n different convertAllele methods are necessary, as I've gone through variant conversion before, but it is probably not clear as to why this many disparate methods are necessary to someone who isn't as familiar to the Broad's VCF code. It would help to add some documentation explaining the need for 4 separate convertAllele(s) functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (allele.isNoCall) ADAMGenotypeAllele.NoCall
else if (allele.isReference) ADAMGenotypeAllele.Ref
else ADAMGenotypeAllele.Alt
}

private def convertAllele(allele: CharSequence, isRef: Boolean = false): Seq[Allele] = {
if (allele == null) Seq() else Seq(Allele.create(allele.toString, isRef))
}

private def convertAlleles(v: ADAMVariant): java.util.Collection[Allele] = {
convertAllele(v.getReferenceAllele, true) ++ convertAllele(v.getVariantAllele)
}

private def convertAlleles(g: ADAMGenotype): java.util.List[Allele] = {
g.getAlleles.map(a => a match {
case ADAMGenotypeAllele.NoCall => Allele.NO_CALL
case ADAMGenotypeAllele.Ref => Allele.create(g.getVariant.getReferenceAllele.toString, true)
case ADAMGenotypeAllele.Alt => Allele.create(g.getVariant.getVariantAllele.toString)
})
}
}

/**
* This class converts VCF data to and from ADAM. This translation occurs at the abstraction level
Expand All @@ -33,15 +138,16 @@ import scala.collection.JavaConverters._
* If an annotation has a corresponding set of fields in the VCF standard, a conversion to/from the
* GATK VariantContext should be implemented in this class.
*/
private[adam] class VariantContextConverter extends Serializable with Logging {
class VariantContextConverter(dict: Option[SequenceDictionary] = None) extends Serializable {

private def convertAllele(allele: Allele): ADAMGenotypeAllele = {
if (allele.isNoCall) ADAMGenotypeAllele.NoCall
else if (allele.isReference) ADAMGenotypeAllele.Ref
else ADAMGenotypeAllele.Alt
}

implicit def gatkAllelesToADAMAlleles(gatkAlleles : util.List[Allele]) : util.List[ADAMGenotypeAllele] = {
implicit def gatkAllelesToADAMAlleles(gatkAlleles : java.util.List[Allele])
: java.util.List[ADAMGenotypeAllele] = {
gatkAlleles.map(convertAllele(_))
}

Expand All @@ -51,7 +157,8 @@ private[adam] class VariantContextConverter extends Serializable with Logging {
* @param vc GATK Variant context to convert.
* @return ADAM variant contexts
*/
def convert(vc: VariantContext): Seq[ADAMVariantContext] = {
def convert(vc:VariantContext): Seq[ADAMVariantContext] = {

var contigId = 0;
// This is really ugly - only temporary until we remove numeric
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a sequence dictionary is provided (which seems to be provided in the constructor), then shouldn't we be able to get the contigId from the chr name and the sequence dictionary? That might be a bit cleaner?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er, nevermind—ignore this comment. I see code relating to that is about 10 lines below here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand though -- it seems like all the contigId fields are getting set to '0'?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if they can't be parsed.

// IDs from our representation of contigs.
Expand All @@ -61,49 +168,119 @@ private[adam] class VariantContextConverter extends Serializable with Logging {
case ex:NumberFormatException => {
}
}
val contig: ADAMContig = ADAMContig.newBuilder()
.setContigId(contigId)

val contig: ADAMContig.Builder = ADAMContig.newBuilder()
.setContigName(vc.getChr)
.build
.setContigId(contigId)

if (dict.isDefined) {
val sr = (dict.get)(vc.getChr)
contig.setContigLength(sr.length).setReferenceURL(sr.url).setContigMD5(sr.md5)
}

// TODO: Handle multi-allelic sites
// We need to split the alleles (easy) and split and subset the PLs (harder)/update the genotype
if (!vc.isBiallelic) {
return Seq()
}

// VCF CHROM, POS, REF and ALT
val variant: ADAMVariant = ADAMVariant.newBuilder
.setContig(contig)
.setContig(contig.build)
.setPosition(vc.getStart - 1 /* ADAM is 0-indexed */)
.setReferenceAllele(vc.getReference.getBaseString)
.setVariantAllele(vc.getAlternateAllele(0).getBaseString)
.build

val genotypes: Seq[ADAMGenotype] = vc.getGenotypes.iterator.asScala.map((g: Genotype) => {
val genotype: ADAMGenotype.Builder = ADAMGenotype.newBuilder
.setVariant(variant)
val shared_genotype_builder: ADAMGenotype.Builder = ADAMGenotype.newBuilder
.setVariant(variant)

val call : VariantCallingAnnotations.Builder =
VariantCallingAnnotations.newBuilder

// VCF QUAL, FILTER and INFO fields
if (vc.hasLog10PError) {
call.setVariantCallErrorProbability(vc.getPhredScaledQual.asInstanceOf[Float])
}

if (vc.isFiltered) { // not PASSing
call.setVariantIsPassing(!vc.isFiltered.booleanValue)
.setVariantFilters(new java.util.ArrayList(vc.getFilters))
} else {
/* otherwise, filters were applied and the variant passed, or no
* filters were appled */
call.setVariantIsPassing(true)
}
shared_genotype_builder.setVariantCallingAnnotations(VariantContextConverter.convertAttributes(vc, call.build()))

// VCF Genotypes
val shared_genotype = shared_genotype_builder.build
val genotypes: Seq[ADAMGenotype] = vc.getGenotypes.map((g: Genotype) => {
val genotype: ADAMGenotype.Builder = ADAMGenotype.newBuilder(shared_genotype)
.setSampleId(g.getSampleName)
.setAlleles(gatkAllelesToADAMAlleles(g.getAlleles))
.setAlleles(g.getAlleles.map(convertAllele(_)))
.setIsPhased(g.isPhased)

if (vc.isFiltered) {
val annotations : VariantCallingAnnotations.Builder =
VariantCallingAnnotations.newBuilder
.setVariantIsPassing(!vc.isFiltered.booleanValue)
.setVariantFilters(new util.ArrayList(vc.getFilters))
genotype.setVariantCallingAnnotations(annotations.build)
if (g.hasGQ) genotype.setGenotypeQuality(g.getGQ)
if (g.hasDP) genotype.setReadDepth(g.getDP)
if (g.hasAD) {
val ad = g.getAD
assert(ad.length == 2, "Unexpected number of allele depths for bi-allelic variant")
genotype.setReferenceReadDepth(ad(0)).setAlternateReadDepth(ad(1))
}
if (g.hasPL) genotype.setGenotypeLikelihoods(g.getPL.toList.map(p => p:java.lang.Integer))

if (g.hasExtendedAttribute(VCFConstants.PHASE_QUALITY_KEY))
genotype.setPhaseQuality(g.getExtendedAttribute(VCFConstants.PHASE_QUALITY_KEY).asInstanceOf[java.lang.Integer])

if (g.hasExtendedAttribute(VCFConstants.PHASE_SET_KEY))
genotype.setPhaseSetId(Integer.parseInt(g.getExtendedAttribute(VCFConstants.PHASE_SET_KEY).asInstanceOf[java.lang.String]))

genotype.build
val built_genotype = genotype.build
for ((v,a) <- VariantContextConverter.VCF2GTAnnos) { // Add extended attributes if present
val attr = g.getExtendedAttribute(v)
if (attr != null && attr != VCFConstants.MISSING_VALUE_v4) {
built_genotype.put(a._1, a._2(attr))
}
}
built_genotype
}).toSeq

val referencePosition = new ReferencePosition(contig.getContigId, variant.getPosition)
Seq(ADAMVariantContext((referencePosition, variant, genotypes)))
Seq(ADAMVariantContext((ReferencePosition(variant), variant, genotypes)))
}

/**
* Convert an ADAMVariantContext into the equivalent GATK VariantContext
* @param vc
* @return GATK VariantContext
*/
def convert(vc:ADAMVariantContext):VariantContext = {
val variant : ADAMVariant = vc.variant
val vcb = new VariantContextBuilder()
.chr(variant.getContig.getContigName.toString)
.start(variant.getPosition + 1 /* Recall ADAM is 0-indexed */)
.stop(variant.getPosition + 1 + variant.getReferenceAllele.length - 1)
.alleles(VariantContextConverter.convertAlleles(variant))

vc.databases.flatMap(d => Option(d.getDbsnpId)).foreach(d => vcb.id("rs" + d))

// TODO: Extract provenance INFO fields
vcb.genotypes(vc.genotypes.map(g => {
val gb = new GenotypeBuilder(g.getSampleId.toString, VariantContextConverter.convertAlleles(g))

Option(g.getIsPhased).foreach(gb.phased(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are setting this field, you might want to check for genotype level invariants as well (e.g., for a single sample, if a call at this site is phased, then all calls at this site should be phased).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is true? We store phase information at the genotype level because we thought it could differ from sample to sample.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed; my suggestion was to check that phasing is consistent across all genotype calls for a single sample.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but I think we don't need to, because phasing is stored at the ADAMGenotype level, but n-ploidy organisms can be represented by a single ADAMGenotype (see field "alleles" inside ADAMGenotype).

Option(g.getGenotypeQuality).foreach(gb.GQ(_))
Option(g.getReadDepth).foreach(gb.DP(_))
if (g.getReferenceReadDepth != null && g.getAlternateReadDepth != null)
gb.AD(Array(g.getReferenceReadDepth, g.getAlternateReadDepth))
if (g.variantCallingAnnotations != null) {
val callAnnotations = g.getVariantCallingAnnotations()
if (callAnnotations.variantFilters != null)
gb.filters(callAnnotations.variantFilters.map(_.toString))
}

if (g.getGenotypeLikelihoods.nonEmpty)
gb.PL(g.getGenotypeLikelihoods.map(p => p:Int).toArray)

gb.make
}))

vcb.make
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ object ADAMVariantContext {
apply((ReferencePosition(v), v, Seq()))
}

/**
* Constructs an ADAMVariantContext from an ADAMVariant and Seq[ADAMGenotype]
*
* @param v ADAMVariant which is used to construct the ReferencePosition
* @param genotypes Seq[ADAMGenotype]
* @return ADAMVariantContext corresponding to the ADAMVariant
*/
def apply(v : ADAMVariant, genotypes : Seq[ADAMGenotype])
: ADAMVariantContext = {
apply((ReferencePosition(v), v, genotypes))
}

/**
* Builds a variant context off of a set of genotypes. Builds variants from the genotypes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,20 +359,21 @@ object SequenceDictionary {
* @param name
* @param length
* @param url
* @param md5
*/
class SequenceRecord(val id: Int, val name: CharSequence, val length: Long, val url: CharSequence) extends Serializable {
class SequenceRecord(val id: Int, val name: CharSequence, val length: Long, val url: CharSequence, val md5: CharSequence) extends Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to adding the checksum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're adding the md5 field, I think this should also go into the ADAMRecord schema as a new field -- as well as any other schema that maintains the flattened set of reference* fields, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo to unify them around ADAMContig, as we chatted about in IRC today.


assert(name != null, "SequenceRecord.name is null")
assert(name.length > 0, "SequenceRecord.name has length 0")
assert(length > 0, "SequenceRecord.length <= 0")

def withReferenceId(newId: Int): SequenceRecord =
new SequenceRecord(newId, name, length, url)
new SequenceRecord(newId, name, length, url, md5)

override def equals(x: Any): Boolean = {
x match {
case y: SequenceRecord =>
id == y.id && name == y.name && length == y.length && url == y.url
id == y.id && name == y.name && length == y.length && url == y.url && md5 == y.md5
case _ => false
}
}
Expand Down Expand Up @@ -402,8 +403,9 @@ class SequenceRecord(val id: Int, val name: CharSequence, val length: Long, val

object SequenceRecord {

def apply(id: Int, name: CharSequence, length: Long, url: CharSequence = null): SequenceRecord =
new SequenceRecord(id, name, length, url)
def apply(id: Int, name: CharSequence, length: Long, url: CharSequence = null, md5: CharSequence = null): SequenceRecord =
new SequenceRecord(id, name, length, url, md5)


/**
* Converts an ADAM contig into a sequence record.
Expand Down Expand Up @@ -480,7 +482,8 @@ object SequenceRecord {
rec.get(schema.getField("referenceId").pos()).asInstanceOf[Int],
rec.get(schema.getField("referenceName").pos()).asInstanceOf[CharSequence],
rec.get(schema.getField("referenceLength").pos()).asInstanceOf[Long],
rec.get(schema.getField("referenceUrl").pos()).asInstanceOf[CharSequence])
rec.get(schema.getField("referenceUrl").pos()).asInstanceOf[CharSequence],
null)
} else if (schema.getField("contig") != null) {
val pos = schema.getField("contig").pos()
val contig = rec.get(pos).asInstanceOf[ADAMContig]
Expand All @@ -489,7 +492,8 @@ object SequenceRecord {
contig.get(contigSchema.getField("contigId").pos()).asInstanceOf[Int],
contig.get(contigSchema.getField("contigName").pos()).asInstanceOf[CharSequence],
contig.get(contigSchema.getField("contigLength").pos()).asInstanceOf[Long],
contig.get(contigSchema.getField("referenceURL").pos()).asInstanceOf[CharSequence])
contig.get(contigSchema.getField("referenceURL").pos()).asInstanceOf[CharSequence],
contig.get(contigSchema.getField("contigMD5").pos()).asInstanceOf[CharSequence])
} else {
null
}
Expand Down
Loading