Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Indexed VCF files #1095

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }
import java.util.regex.Pattern
import com.google.common.collect.ImmutableList
import htsjdk.samtools.{ SAMFileHeader, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.samtools.util.{ Locatable, Interval }
import htsjdk.variant.vcf.VCFHeader
import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
Expand Down Expand Up @@ -596,16 +597,23 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
))
}

private def readVcfRecords(filePath: String): RDD[(LongWritable, VariantContextWritable)] = {
private def readVcfRecords(filePath: String, viewRegion: Option[ReferenceRegion]): RDD[(LongWritable, VariantContextWritable)] = {
// load vcf data
val job = HadoopUtil.newJob(sc)
job.getConfiguration().setStrings("io.compression.codecs",
classOf[BGZFCodec].getCanonicalName(),
classOf[BGZFEnhancedGzipCodec].getCanonicalName())

val conf = ContextUtil.getConfiguration(job)
if (viewRegion.isDefined) {
val interval: Interval = new Interval(viewRegion.get.referenceName, viewRegion.get.start.toInt, viewRegion.get.end.toInt)
VCFInputFormat.setIntervals(conf, ImmutableList.of(interval))
}

sc.newAPIHadoopFile(
filePath,
classOf[VCFInputFormat], classOf[LongWritable], classOf[VariantContextWritable],
ContextUtil.getConfiguration(job)
conf
)
}

Expand All @@ -623,7 +631,41 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val vcc = new VariantContextConverter(sdOpt)

// load records from VCF
val records = readVcfRecords(filePath)
val records = readVcfRecords(filePath, None)

// attach instrumentation
if (Metrics.isRecording) records.instrument() else records

// load vcf metadata
val (vcfSd, samples) = loadVcfMetadata(filePath)

// we can only replace the sequences header if the sequence info was missing on the vcf
require(sdOpt.isEmpty || vcfSd.isEmpty,
"Only one of the provided or VCF sequence dictionary can be specified.")
val sd = sdOpt.getOrElse(vcfSd)

VariantContextRDD(records.flatMap(p => vcc.convert(p._2.get)),
sd,
samples)
}

/**
* Loads a VCF file indexed by a tabix (tbi) file into an RDD.
*
* @param filePath The file to load.
* @param viewRegion The ReferenceRegion we are filtering on.
* @param sdOpt An optional sequence dictionary, in case the sequence info
* is not included in the VCF.
* @return Returns a VariantContextRDD.
*/
def loadIndexedVcf(filePath: String,
viewRegion: ReferenceRegion,
sdOpt: Option[SequenceDictionary] = None): VariantContextRDD = {

val vcc = new VariantContextConverter(sdOpt)

// load records from VCF
val records = readVcfRecords(filePath, Some(viewRegion))

// attach instrumentation
if (Metrics.isRecording) records.instrument() else records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.bdgenomics.adam.models.{
SequenceDictionary
}
import org.bdgenomics.adam.rdd.{ AvroGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.util.ReferenceFile
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment }
import scala.collection.JavaConversions._
import scala.math.max
Expand Down Expand Up @@ -55,7 +56,7 @@ object NucleotideContigFragmentRDD extends Serializable {

case class NucleotideContigFragmentRDD(
rdd: RDD[NucleotideContigFragment],
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] {
sequences: SequenceDictionary) extends AvroGenomicRDD[NucleotideContigFragment, NucleotideContigFragmentRDD] with ReferenceFile {

/**
* Converts an RDD of nucleotide contig fragments into reads. Adjacent contig fragments are
Expand Down Expand Up @@ -169,7 +170,7 @@ case class NucleotideContigFragmentRDD(
* @param region Reference region over which to get sequence.
* @return String of bases corresponding to reference sequence.
*/
def getReferenceString(region: ReferenceRegion): String = {
def extract(region: ReferenceRegion): String = {
def getString(fragment: (ReferenceRegion, NucleotideContigFragment)): (ReferenceRegion, String) = {
val trimStart = max(0, region.start - fragment._1.start).toInt
val trimEnd = max(0, fragment._1.end - region.end).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ package org.bdgenomics.adam.util
import org.apache.spark.rdd.RDD
// NOTE(ryan): this is necessary for Spark <= 1.2.1.
import org.apache.spark.SparkContext._
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary, SequenceRecord }
import org.bdgenomics.formats.avro.NucleotideContigFragment

case class ReferenceContigMap(contigMap: Map[String, Seq[NucleotideContigFragment]]) extends ReferenceFile {

// create sequence dictionary
val sequences: SequenceDictionary = new SequenceDictionary(contigMap.map(r =>
SequenceRecord(r._1, r._2.map(_.getFragmentEndPosition).max)).toVector)

/**
* Extract reference sequence from the file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.bdgenomics.adam.util

import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion }

/**
* File that contains a reference assembly that can be broadcasted
Expand All @@ -31,4 +31,10 @@ trait ReferenceFile extends Serializable {
* @return The reference sequence at the desired locus.
*/
def extract(region: ReferenceRegion): String

/*
* Stores SequenceDictionary for ReferenceFile
*/
def sequences: SequenceDictionary

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.{ ByteOrder, ByteBuffer }
import com.esotericsoftware.kryo.io.{ Output, Input }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import org.bdgenomics.utils.io.{ ByteArrayByteAccess, ByteAccess }
import org.bdgenomics.adam.models.{ NonoverlappingRegions, ReferencePosition, ReferenceRegion }
import org.bdgenomics.adam.models._

object TwoBitFile {
val MAGIC_NUMBER: Int = 0x1A412743
Expand Down Expand Up @@ -67,7 +67,10 @@ class TwoBitFile(byteAccess: ByteAccess) extends ReferenceFile {
indexRecordStart += TwoBitFile.NAME_SIZE_SIZE + tup._1.length + TwoBitFile.OFFSET_SIZE
tup
}).toMap
val seqRecords = seqRecordStarts.map(tup => tup._1 -> TwoBitRecord(bytes, tup._1, tup._2)).toMap
val seqRecords = seqRecordStarts.map(tup => tup._1 -> TwoBitRecord(bytes, tup._1, tup._2))

// create sequence dictionary
val sequences = new SequenceDictionary(seqRecords.toVector.map(r => SequenceRecord(r._1, r._2.dnaSize)))

private def readHeader(): Int = {
// figure out proper byte order
Expand Down
Binary file added adam-core/src/test/resources/bqsr1.vcf.tbi
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -423,5 +423,12 @@ class ADAMContextSuite extends ADAMFunSuite {
val reads = sc.loadIndexedBam(path, refRegion)
assert(reads.rdd.count == 1)
}

sparkTest("loadIndexedVcf") {
val path = resourcePath("bqsr1.vcf")
val refRegion = ReferenceRegion("22", 0, 16050822)
val reads2 = sc.loadIndexedVcf(path, refRegion)
assert(reads2.rdd.count == 8)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.bdgenomics.adam.rdd.contig

import com.google.common.io.Files
import java.io.File
import org.apache.spark.rdd.RDD

import com.google.common.io.Files
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro._

import scala.collection.mutable.ListBuffer

class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
Expand Down Expand Up @@ -76,7 +76,7 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {

val rdd = NucleotideContigFragmentRDD(sc.parallelize(List(fragment)))

assert(rdd.getReferenceString(region) === "ACTGTAC")
assert(rdd.extract(region) === "ACTGTAC")
}

sparkTest("recover trimmed reference string from a single contig fragment") {
Expand All @@ -97,7 +97,7 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {

val rdd = NucleotideContigFragmentRDD(sc.parallelize(List(fragment)))

assert(rdd.getReferenceString(region) === "CTGTA")
assert(rdd.extract(region) === "CTGTA")
}

sparkTest("recover reference string from multiple contig fragments") {
Expand Down Expand Up @@ -143,8 +143,8 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
fragment1,
fragment2)))

assert(rdd.getReferenceString(region0) === "ACTGTAC")
assert(rdd.getReferenceString(region1) === "GTACTCTCATG")
assert(rdd.extract(region0) === "ACTGTAC")
assert(rdd.extract(region1) === "GTACTCTCATG")
}

sparkTest("recover trimmed reference string from multiple contig fragments") {
Expand Down Expand Up @@ -190,8 +190,8 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
fragment1,
fragment2)))

assert(rdd.getReferenceString(region0) === "CTGTA")
assert(rdd.getReferenceString(region1) === "CTCTCA")
assert(rdd.extract(region0) === "CTGTA")
assert(rdd.extract(region1) === "CTCTCA")
}

sparkTest("testing nondeterminism from reduce when recovering referencestring") {
Expand All @@ -213,7 +213,7 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
var passed = true
val rdd = NucleotideContigFragmentRDD(sc.parallelize(fragments.toList))
try {
val result = rdd.getReferenceString(new ReferenceRegion("chr1", 0L, 1000L))
val result = rdd.extract(new ReferenceRegion("chr1", 0L, 1000L))
} catch {
case e: AssertionError => passed = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.bdgenomics.adam.rdd.read

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.util.{ ReferenceContigMap, ADAMFunSuite }
import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment, Contig }
import org.bdgenomics.utils.misc.SparkFunSuite
import org.bdgenomics.adam.util.{ ADAMFunSuite, ReferenceContigMap }
import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig, NucleotideContigFragment }

class MDTaggingSuite extends ADAMFunSuite {
val chr1 =
Expand All @@ -34,7 +33,13 @@ class MDTaggingSuite extends ADAMFunSuite {
sc.parallelize(
for {
(contig, start, seq) <- frags
} yield NucleotideContigFragment.newBuilder().setContig(contig).setFragmentStartPosition(start.toLong).setFragmentSequence(seq).build()
} yield (
NucleotideContigFragment.newBuilder()
.setContig(contig)
.setFragmentStartPosition(start.toLong)
.setFragmentEndPosition(start.toLong + seq.length)
.setFragmentSequence(seq).build()
)
)

def makeReads(reads: ((Contig, Int, Int, String, String), String)*): (Map[Int, String], RDD[AlignmentRecord]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.bdgenomics.adam.util

import java.io.File

import org.bdgenomics.utils.io.LocalFileByteAccess
import org.bdgenomics.adam.models.ReferenceRegion
import org.scalatest.FunSuite
import org.bdgenomics.utils.io.LocalFileByteAccess

class TwoBitSuite extends ADAMFunSuite {
test("correctly read sequence from .2bit file") {
Expand Down Expand Up @@ -49,4 +48,13 @@ class TwoBitSuite extends ADAMFunSuite {
val twoBitFile = new TwoBitFile(byteAccess)
assert(twoBitFile.extract(ReferenceRegion("1", 9990, 10010), true) == "NNNNNNNNNNTAACCCTAAC")
}

test("correctly calculates sequence dictionary") {
val file = new File(resourcePath("hg19.chrM.2bit"))
val byteAccess = new LocalFileByteAccess(file)
val twoBitFile = new TwoBitFile(byteAccess)
val dict = twoBitFile.sequences
assert(dict.records.length == 1)
assert(dict.records.head.length == 16571)
}
}