Skip to content

Commit

Permalink
Read VCF header from stream in VCFOutFormatter.
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jun 9, 2017
1 parent ad5ae6d commit 1cf2733
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import htsjdk.variant.variantcontext.{
VariantContextBuilder
}
import htsjdk.variant.vcf.{
VCFCompoundHeaderLine,
VCFConstants,
VCFInfoHeaderLine,
VCFFormatHeaderLine,
VCFHeader,
VCFHeaderLine,
VCFHeaderLineCount,
VCFHeaderLineType
VCFHeaderLineType,
VCFInfoHeaderLine
}
import java.util.Collections
import org.bdgenomics.utils.misc.Logging
Expand All @@ -45,6 +47,7 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.adam.util.PhredUtils
import org.bdgenomics.formats.avro._
import org.slf4j.Logger
import scala.collection.JavaConversions._
import scala.collection.mutable.{ Buffer, HashMap }

Expand Down Expand Up @@ -150,6 +153,74 @@ object VariantContextConverter {
case GenotypeAllele.ALT => Allele.create(g.getVariant.getAlternateAllele)
}
}

private[adam] def cleanAndMixInSupportedLines(
headerLines: Seq[VCFHeaderLine],
stringency: ValidationStringency,
log: Logger): Seq[VCFHeaderLine] = {

// dedupe
val deduped = headerLines.distinct

def auditLine(line: VCFCompoundHeaderLine,
defaultLine: VCFCompoundHeaderLine,
replaceFn: (String, VCFCompoundHeaderLine) => VCFCompoundHeaderLine): Option[VCFCompoundHeaderLine] = {
if (line.getType != defaultLine.getType) {
val msg = "Field type for provided header line (%s) does not match supported line (%s)".format(
line, defaultLine)
if (stringency == ValidationStringency.STRICT) {
throw new IllegalArgumentException(msg)
} else {
if (stringency == ValidationStringency.LENIENT) {
log.warn(msg)
}
Some(replaceFn("BAD_%s".format(line.getID), line))
}
} else {
None
}
}

// remove our supported header lines
deduped.flatMap(line => line match {
case fl: VCFFormatHeaderLine => {
val key = fl.getID
DefaultHeaderLines.formatHeaderLines
.find(_.getID == key)
.fold(Some(fl).asInstanceOf[Option[VCFCompoundHeaderLine]])(defaultLine => {
auditLine(fl, defaultLine, (newId, oldLine) => {
new VCFFormatHeaderLine(newId,
oldLine.getCountType,
oldLine.getType,
oldLine.getDescription)
})
})
}
case il: VCFInfoHeaderLine => {
val key = il.getID
DefaultHeaderLines.infoHeaderLines
.find(_.getID == key)
.fold(Some(il).asInstanceOf[Option[VCFCompoundHeaderLine]])(defaultLine => {
auditLine(il, defaultLine, (newId, oldLine) => {
new VCFInfoHeaderLine(newId,
oldLine.getCountType,
oldLine.getType,
oldLine.getDescription)
})
})
}
case l => {
Some(l)
}
}) ++ DefaultHeaderLines.allHeaderLines
}

private[adam] def headerLines(header: VCFHeader): Seq[VCFHeaderLine] = {
(header.getFilterLines ++
header.getFormatHeaderLines ++
header.getInfoHeaderLines ++
header.getOtherHeaderLines).toSeq
}
}

/**
Expand Down
72 changes: 3 additions & 69 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.converters.VariantContextConverter._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
import org.bdgenomics.adam.models._
Expand Down Expand Up @@ -229,73 +230,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
new Path(pathName)))
}

private def cleanAndMixInSupportedLines(
headerLines: Seq[VCFHeaderLine],
stringency: ValidationStringency): Seq[VCFHeaderLine] = {

// dedupe
val deduped = headerLines.distinct

def auditLine(line: VCFCompoundHeaderLine,
defaultLine: VCFCompoundHeaderLine,
replaceFn: (String, VCFCompoundHeaderLine) => VCFCompoundHeaderLine): Option[VCFCompoundHeaderLine] = {
if (line.getType != defaultLine.getType) {
val msg = "Field type for provided header line (%s) does not match supported line (%s)".format(
line, defaultLine)
if (stringency == ValidationStringency.STRICT) {
throw new IllegalArgumentException(msg)
} else {
if (stringency == ValidationStringency.LENIENT) {
log.warn(msg)
}
Some(replaceFn("BAD_%s".format(line.getID), line))
}
} else {
None
}
}

// remove our supported header lines
deduped.flatMap(line => line match {
case fl: VCFFormatHeaderLine => {
val key = fl.getID
DefaultHeaderLines.formatHeaderLines
.find(_.getID == key)
.fold(Some(fl).asInstanceOf[Option[VCFCompoundHeaderLine]])(defaultLine => {
auditLine(fl, defaultLine, (newId, oldLine) => {
new VCFFormatHeaderLine(newId,
oldLine.getCountType,
oldLine.getType,
oldLine.getDescription)
})
})
}
case il: VCFInfoHeaderLine => {
val key = il.getID
DefaultHeaderLines.infoHeaderLines
.find(_.getID == key)
.fold(Some(il).asInstanceOf[Option[VCFCompoundHeaderLine]])(defaultLine => {
auditLine(il, defaultLine, (newId, oldLine) => {
new VCFInfoHeaderLine(newId,
oldLine.getCountType,
oldLine.getType,
oldLine.getDescription)
})
})
}
case l => {
Some(l)
}
}) ++ DefaultHeaderLines.allHeaderLines
}

private def headerLines(header: VCFHeader): Seq[VCFHeaderLine] = {
(header.getFilterLines ++
header.getFormatHeaderLines ++
header.getInfoHeaderLines ++
header.getOtherHeaderLines).toSeq
}

private def loadHeaderLines(pathName: String): Seq[VCFHeaderLine] = {
getFsAndFilesWithFilter(pathName, new FileFilter("_header"))
.map(p => headerLines(readVcfHeader(p.toString)))
Expand Down Expand Up @@ -1139,7 +1073,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)),
sd,
samples,
cleanAndMixInSupportedLines(headers, stringency))
cleanAndMixInSupportedLines(headers, stringency, log))
}

/**
Expand Down Expand Up @@ -1185,7 +1119,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)),
sd,
samples,
cleanAndMixInSupportedLines(headers, stringency))
cleanAndMixInSupportedLines(headers, stringency, log))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,26 @@ package org.bdgenomics.adam.rdd.variant
import htsjdk.samtools.ValidationStringency
import htsjdk.variant.vcf.{
VCFCodec,
VCFHeader,
VCFHeaderLine
}
import htsjdk.tribble.readers.{
AsciiLineReader,
AsciiLineReaderIterator
}
import java.io.InputStream
import org.bdgenomics.adam.converters.VariantContextConverter._
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.models.VariantContext
import org.bdgenomics.adam.rdd.OutFormatter
import org.bdgenomics.utils.misc.Logging
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

/**
* OutFormatter that reads streaming VCF.
*/
case class VCFOutFormatter(headerLines: Seq[VCFHeaderLine]) extends OutFormatter[VariantContext] {
case class VCFOutFormatter extends OutFormatter[VariantContext] with Logging {

/**
* Reads VariantContexts from an input stream. Autodetects VCF format.
Expand All @@ -46,18 +49,20 @@ case class VCFOutFormatter(headerLines: Seq[VCFHeaderLine]) extends OutFormatter
*/
def read(is: InputStream): Iterator[VariantContext] = {

// make converter and empty dicts
val converter = new VariantContextConverter(headerLines,
ValidationStringency.LENIENT)

// make line reader iterator
val lri = new AsciiLineReaderIterator(new AsciiLineReader(is))

// make reader
val codec = new VCFCodec()

// read the header, and then throw it away
val header = codec.readActualHeader(lri)
// read the header
val header = codec.readActualHeader(lri).asInstanceOf[VCFHeader]

// merge header lines with our supported header lines
val lines = cleanAndMixInSupportedLines(headerLines(header), ValidationStringency.LENIENT, log)

// make converter
val converter = new VariantContextConverter(lines, ValidationStringency.LENIENT)

@tailrec def convertIterator(iter: AsciiLineReaderIterator,
records: ListBuffer[VariantContext] = ListBuffer.empty): Iterator[VariantContext] = {
Expand Down
Loading

0 comments on commit 1cf2733

Please sign in to comment.