Skip to content

Commit

Permalink
Estimate contig lengths in SequenceDictionary for BED, GFF3, GTF, and…
Browse files Browse the repository at this point in the history
… NarrowPeak feature formats (#1411)

Estimate contig lengths in SequenceDictionary for BED, GFF3, GTF, and NarrowPeak feature formats
  • Loading branch information
heuermh authored and fnothaft committed Mar 3, 2017
1 parent 55dba3d commit 6ee0b8b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
.map(new IntervalListParser().parseWithHeader(_, stringency))
val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*),
parsedLines.flatMap(_._2))
val seqDictMap = seqDict.records.map(sr => sr.name -> sr).toMap

if (Metrics.isRecording) records.instrument() else records
FeatureRDD(records, seqDict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ private[rdd] class IntervalListParser extends FeatureParser {
(attrs("SN"), attrs("LN").toLong, attrs("UR"), attrs("M5"))
}
Some(SequenceRecord(name, length, md5, url))
} else {
None
}
None
}

override def parse(line: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.bdgenomics.utils.interval.array.{
}
import org.bdgenomics.utils.misc.Logging
import scala.collection.JavaConversions._
import scala.math.max
import scala.reflect.ClassTag

private[adam] case class FeatureArray(
Expand Down Expand Up @@ -101,14 +102,6 @@ private object FeatureOrdering extends FeatureOrdering[Feature] {}

object FeatureRDD {

/**
* @param elem The feature to extract a sequence record from.
* @return Gets the SequenceRecord for this feature.
*/
private def getSequenceRecord(elem: Feature): SequenceRecord = {
SequenceRecord(elem.getContigName, 1L)
}

/**
* Builds a FeatureRDD without SequenceDictionary information by running an
* aggregate to rebuild the SequenceDictionary.
Expand All @@ -121,11 +114,14 @@ object FeatureRDD {
// cache the rdd, since we're making multiple passes
rdd.cache()

// aggregate to create the sequence dictionary
val sd = new SequenceDictionary(rdd.map(getSequenceRecord)
.distinct
.collect
.toVector)
// create sequence records with length max(start, end) + 1L
val sequenceRecords = rdd
.keyBy(_.getContigName)
.map(kv => (kv._1, max(kv._2.getStart, kv._2.getEnd) + 1L))
.reduceByKey(max(_, _))
.map(kv => SequenceRecord(kv._1, kv._2))

val sd = new SequenceDictionary(sequenceRecords.collect.toVector)

FeatureRDD(rdd, sd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,5 +732,56 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {
assert(c.filter(_._1.isEmpty).forall(_._2.size == 1))
assert(c0.filter(_._1.isEmpty).forall(_._2.size == 1))
}
}

sparkTest("estimate sequence dictionary contig lengths from GTF format") {
val inputPath = testFile("Homo_sapiens.GRCh37.75.trun100.gtf")
val features = sc.loadGtf(inputPath)
// max(start,end) = 1 36081
assert(features.sequences.containsRefName("1"))
assert(features.sequences.apply("1").isDefined)
assert(features.sequences.apply("1").get.length >= 36081L)
}

sparkTest("estimate sequence dictionary contig lengths from GFF3 format") {
val inputPath = testFile("dvl1.200.gff3")
val features = sc.loadGff3(inputPath)
// max(start, end) = 1 1356705
assert(features.sequences.containsRefName("1"))
assert(features.sequences.apply("1").isDefined)
assert(features.sequences.apply("1").get.length >= 1356705L)
}

sparkTest("estimate sequence dictionary contig lengths from BED format") {
val inputPath = testFile("dvl1.200.bed")
val features = sc.loadBed(inputPath)
// max(start, end) = 1 1358504
assert(features.sequences.containsRefName("1"))
assert(features.sequences.apply("1").isDefined)
assert(features.sequences.apply("1").get.length >= 1358504L)
}

sparkTest("obtain sequence dictionary contig lengths from header in IntervalList format") {
val inputPath = testFile("SeqCap_EZ_Exome_v3.hg19.interval_list")
val features = sc.loadIntervalList(inputPath)
/*
@SQ SN:chr1 LN:249250621
@SQ SN:chr2 LN:243199373
*/
assert(features.sequences.containsRefName("chr1"))
assert(features.sequences.apply("chr1").isDefined)
assert(features.sequences.apply("chr1").get.length >= 249250621L)

assert(features.sequences.containsRefName("chr2"))
assert(features.sequences.apply("chr2").isDefined)
assert(features.sequences.apply("chr2").get.length >= 243199373L)
}

sparkTest("estimate sequence dictionary contig lengths from NarrowPeak format") {
val inputPath = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val features = sc.loadNarrowPeak(inputPath)
// max(start, end) = chr1 794336
assert(features.sequences.containsRefName("chr1"))
assert(features.sequences.apply("chr1").isDefined)
assert(features.sequences.apply("chr1").get.length >= 794336L)
}
}

0 comments on commit 6ee0b8b

Please sign in to comment.