Skip to content

Commit

Permalink
work in progress, adding GFF3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed May 20, 2016
1 parent 7d7acad commit 8c212da
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
records.map(fastqRecordConverter.convertFragment)
}

def loadGff3(filePath: String, minPartitions: Option[Int] = None): RDD[Feature] = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)).flatMap(new GFF3Parser().parse)
if (Metrics.isRecording) records.instrument() else records
}

def loadGtf(filePath: String, minPartitions: Option[Int] = None): RDD[Feature] = {
val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)).flatMap(new GTFParser().parse)
if (Metrics.isRecording) records.instrument() else records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.bdgenomics.adam.rdd.features
import java.io.File
import java.util.UUID
import org.bdgenomics.adam.models.SequenceRecord
import org.bdgenomics.formats.avro.{ Dbxref, Contig, Strand, Feature }
import org.bdgenomics.formats.avro.{ Contig, Dbxref, Feature, OntologyTerm, Strand }
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -132,6 +132,88 @@ class GTFParser extends FeatureParser {
}
}

object GFF3Parser {
def toStrand(s: String): Option[Strand] = {
s match {
case "+" => Some(Strand.FORWARD)
case "-" => Some(Strand.REVERSE)
case "." => Some(Strand.INDEPENDENT)
case "?" => Some(Strand.UNKNOWN)
case _ => None
}
}

def toDbxref(s: String): Dbxref = {
new Dbxref(s.substring(0, s.indexOf(':')), s.substring(s.indexOf(':')))
}

def toOntologyTerm(s: String): OntologyTerm = {
new OntologyTerm(s.substring(0, s.indexOf(':')), s.substring(s.indexOf(':')))
}
}

/**
* Parser for GFF3 format.
*/
class GFF3Parser extends FeatureParser {

override def parse(line: String): Seq[Feature] = {
// skip header and comment lines
if (line.startsWith("#")) {
return Seq()
}

val fields = line.split("\t")
val (seqid, source, featureType, start, end, score, strand, phase, attributes) =
(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7), fields(8))

val f = Feature.newBuilder()
.setSource(source)
.setType(featureType)
.setContigName(seqid)
.setStart(start.toLong - 1L) // GFF3 coordinate system is 1-based
.setEnd(end.toLong) // GFF3 ranges are closed

// set score if specified
if (score != ".") {
f.setScore(source.toDouble)
}
// set phase if specified
if (phase != ".") {
f.setPhase(phase.toInt)
}
// set strand if specified
GFF3Parser.toStrand(strand).foreach(f.setStrand(_))

// set id, name, target, gap, derivesFrom, and isCircular
// and populate aliases, notes, parentIds, dbxrefs, and ontologyTerms
// from attributes if specified
val remaining = scala.collection.mutable.HashMap.empty[String, String]
GTFParser.parseAttrs(attributes)
.foreach(entry =>
entry._1 match {
// reserved keys in GFF3 specification
case "ID" => f.setId(entry._2)
case "Name" => f.setName(entry._2)
case "Target" => f.setTarget(entry._2)
case "Gap" => f.setGap(entry._2)
case "Derives_from" => f.setDerivesFrom(entry._2)
case "Is_circular" => f.setIsCircular(entry._2.toBoolean)
case "Alias" => f.getAliases().add(entry._2)
case "Note" => f.getNotes().add(entry._2)
case "Parent" => f.getParentIds().add(entry._2)
case "Dbxref" => f.getDbxrefs().add(GFF3Parser.toDbxref(entry._2))
case "Ontology_term" => f.getOntologyTerms().add(GFF3Parser.toOntologyTerm(entry._2))
case _ => remaining + entry
}
)
// set remaining attributes, if any
f.setAttributes(remaining)

Seq(f.build())
}
}

class IntervalListParser extends Serializable {
def parse(line: String): (Option[SequenceRecord], Option[Feature]) = {
val fields = line.split("[ \t]+")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.google.common.base.Joiner
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.bdgenomics.adam.models._
import org.bdgenomics.formats.avro.{ Strand, Feature }
import org.bdgenomics.formats.avro.{ Dbxref, Feature, OntologyTerm, Strand }
import scala.collection.JavaConversions._

class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Logging {
Expand Down Expand Up @@ -166,6 +166,47 @@ class FeatureRDDFunctions(featureRDD: RDD[Feature]) extends Serializable with Lo
featureRDD.map(toGtf).saveAsTextFile(fileName)
}

def saveAsGff3(fileName: String) = {
def escape(entry: (Any, Any)): String = {
entry._1 + " \"" + entry._2 + "\""
}

def toGff3(feature: Feature): String = {
val seqid = feature.getContigName
val source = Option(feature.getSource).getOrElse(".")
val featureType = Option(feature.getType).getOrElse(".")
val start = feature.getStart + 1 // GFF3 coordinate system is 1-based
val end = feature.getEnd // GFF3 ranges are closed
val score = Option(feature.getScore).getOrElse(".")
val strand = feature.getStrand match {
case Strand.FORWARD => "+"
case Strand.REVERSE => "-"
case Strand.INDEPENDENT => "."
case Strand.UNKNOWN => "?"
}
val phase = Option(feature.getPhase).getOrElse(".")

// gather attributes from various fields
val attrs = scala.collection.mutable.HashMap.empty[String, String]
Option(feature.getId).foreach(attrs.put("ID", _))
Option(feature.getName).foreach(attrs.put("Name", _))
Option(feature.getTarget).foreach(attrs.put("Target", _))
Option(feature.getGap).foreach(attrs.put("Gap", _))
Option(feature.getDerivesFrom).foreach(attrs.put("Derives_from", _))
Option(feature.getIsCircular).foreach(attrs.put("Is_circular", _))
for (alias <- feature.getAliases) attrs.put("Alias", alias)
for (note <- feature.getNotes) attrs.put("Note", note)
for (parentId <- feature.getParentIds) attrs.put("Parent", parentId)
for (dbxref <- feature.getDbxrefs) attrs.put("Dbxref", dbxref.getDb + ":" + dbxref.getAccession)
for (ontologyTerm <- feature.getOntologyTerms) attrs.put("Ontology_term", ontologyTerm.getDb + ":" + ontologyTerm.getAccession)
attrs ++ feature.getAttributes

val attributes = Joiner.on("; ").join(attrs.toSeq.map(escape))
Joiner.on("\t").join(seqid, source, featureType, start: java.lang.Long, end: java.lang.Long, score, strand, phase, attributes)
}
featureRDD.map(toGff3).saveAsTextFile(fileName)
}

def saveAsBed(fileName: String) = {
def toBed(feature: Feature): String = {
val chrom = feature.getContig.getContigName
Expand Down
3 changes: 3 additions & 0 deletions adam-core/src/test/resources/features/amphimedon.gff3
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
##gff-version 3
Contig13175 JGI gene 1 1864985 . - . ID=Aqu1;Name=Aqu1
Contig100 JGI mRNA 27 746 . - . ID=PAC:15698561;Name=Aqu1.200033;PACid=15698561;Parent=Aqu1
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
features.saveAsGtf(outputPath)
}

sparkTest("save GTF as GFF3 format") {
val inputPath = resourcePath("features/Homo_sapiens.GRCh37.75.trun100.gtf")
val features = sc.loadGtf(inputPath)
val outputPath = tempLocation(".gff3")
features.saveAsGff3(outputPath)
}

sparkTest("save GTF as BED format") {
val inputPath = resourcePath("features/Homo_sapiens.GRCh37.75.trun100.gtf")
val features = sc.loadGtf(inputPath)
Expand Down Expand Up @@ -120,6 +127,48 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
pairs.foreach({ pair: (Feature, Feature) => assert(pair._1 === pair._2) })
}

sparkTest("save GFF3 as GTF format") {
val inputPath = resourcePath("features/amphimedon.gff3") // todo: find a more complete example
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".gtf")
features.saveAsGtf(outputPath)
}

sparkTest("save GFF3 as BED format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".bed")
features.saveAsBed(outputPath)
}

sparkTest("save GFF3 as IntervalList format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".interval_list")
features.saveAsIntervalList(outputPath)
}

sparkTest("save GFF3 as NarrowPeak format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val features = sc.loadGff3(inputPath)
val outputPath = tempLocation(".narrowPeak")
features.saveAsNarrowPeak(outputPath)
}

sparkTest("round trip GFF3 format") {
val inputPath = resourcePath("features/amphimedon.gff3")
val expected = sc.loadGff3(inputPath)
val outputPath = tempLocation(".gff3")
expected.saveAsGff3(outputPath)

// grab all partitions, may not necessarily be in order; sort by exon
val actual = sc.loadGff3(outputPath + "/part-*")
val pairs = sortByExon(expected).zip(sortByExon(actual)).collect

// separate foreach since assert is not serializable
pairs.foreach({ pair: (Feature, Feature) => assert(pair._1 === pair._2) })
}

sparkTest("save BED as GTF format") {
// invalid BED file, has "pseudogene" in column 7, which should be thickStart
//val inputPath = resourcePath("features/gencode.v7.annotation.trunc10.bed")
Expand All @@ -129,6 +178,14 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
features.saveAsGtf(outputPath)
}

sparkTest("save BED as GFF3 format") {
//val inputPath = resourcePath("features/gencode.v7.annotation.trunc10.bed")
val inputPath = resourcePath("features/ItemRGBDemo.bed")
val features = sc.loadBed(inputPath)
val outputPath = tempLocation(".gff3")
features.saveAsGff3(outputPath)
}

sparkTest("save BED as BED format") {
//val inputPath = resourcePath("features/gencode.v7.annotation.trunc10.bed")
val inputPath = resourcePath("features/ItemRGBDemo.bed")
Expand Down Expand Up @@ -175,6 +232,13 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
features.saveAsGtf(outputPath)
}

sparkTest("save IntervalList as GFF3 format") {
val inputPath = resourcePath("features/SeqCap_EZ_Exome_v3.hg19.interval_list")
val features = sc.loadIntervalList(inputPath)
val outputPath = tempLocation(".gff3")
features.saveAsGff3(outputPath)
}

sparkTest("save IntervalList as BED format") {
val inputPath = resourcePath("features/SeqCap_EZ_Exome_v3.hg19.interval_list")
val features = sc.loadIntervalList(inputPath)
Expand Down Expand Up @@ -219,6 +283,13 @@ class FeatureRDDFunctionsSuite extends ADAMFunSuite with TypeCheckedTripleEquals
features.saveAsGtf(outputPath)
}

sparkTest("save NarrowPeak as GFF3 format") {
val inputPath = resourcePath("features/wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val features = sc.loadNarrowPeak(inputPath)
val outputPath = tempLocation(".gff3")
features.saveAsGff3(outputPath)
}

sparkTest("save NarrowPeak as BED format") {
val inputPath = resourcePath("features/wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val features = sc.loadNarrowPeak(inputPath)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<hadoop-bam.version>7.5.0</hadoop-bam.version>
<scoverage.version>1.1.1</scoverage.version>
<slf4j.version>1.7.21</slf4j.version>
<bdg-formats.version>0.7.1</bdg-formats.version>
<bdg-formats.version>0.8.1-SNAPSHOT</bdg-formats.version>
<bdg-utils.version>0.2.6</bdg-utils.version>
<htsjdk.version>2.3.0</htsjdk.version>
</properties>
Expand Down

0 comments on commit 8c212da

Please sign in to comment.