Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FastaConverter Refactor #211

Merged
merged 1 commit into from
Apr 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,7 @@ class Fasta2ADAM(protected val args: Fasta2ADAMArgs) extends ADAMSparkCommand[Fa

def run(sc: SparkContext, job: Job) {
log.info("Loading FASTA data from disk.")
val fastaData: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(args.fastaFile,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text])

val remapData = fastaData.map(kv => (kv._1.get.toInt, kv._2.toString.toString))

log.info("Converting FASTA to ADAM.")
val adamFasta = FastaConverter(remapData, args.fragmentLength)

val adamFasta = sc.adamSequenceLoad(args.fastaFile, args.fragmentLength)
if (args.verbose) {
println("FASTA contains:")
println(adamFasta.adamGetSequenceDictionary())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,42 @@
package org.bdgenomics.adam.converters

import org.bdgenomics.adam.avro.ADAMNucleotideContigFragment
import org.bdgenomics.adam.rdd.ADAMContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import scala.math.Ordering._
import scala.Int
import scala.math.Ordering.Int
import scala.Predef._
import scala.Some

/**
* Object for converting an RDD containing FASTA sequence data into ADAM FASTA data.
*/
private[adam] object FastaConverter {

case class FastaDescriptionLine(val fileIndex: Int = -1, val seqId: Int = 0, val descriptionLine: Option[String] = None) {
val (contigName, contigDescription) = parseDescriptionLine(descriptionLine, fileIndex)

private def parseDescriptionLine(descriptionLine: Option[String], id: Int): (Option[String], Option[String]) = {
if (descriptionLine.isEmpty) {
assert(id == -1, "Cannot have a headerless line in a file with more than one fragment.")
(None, None)
} else {
val splitIndex = descriptionLine.get.indexOf(' ')
if (splitIndex >= 0) {
val split = descriptionLine.get.splitAt(splitIndex)

val contigName: String = split._1.stripPrefix(">").trim
val contigDescription: String = split._2.trim

(Some(contigName), Some(contigDescription))

} else {
(Some(descriptionLine.get.stripPrefix(">").trim), None)
}
}
}
}

/**
* Converts an RDD containing ints and strings into an RDD containing ADAM nucleotide
* contig fragments.
Expand All @@ -47,64 +73,51 @@ private[adam] object FastaConverter {
val filtered = rdd.map(kv => (kv._1, kv._2.trim()))
.filter((kv: (Int, String)) => !kv._2.startsWith(";"))

val indices: Map[Int, Int] = rdd.filter((kv: (Int, String)) => kv._2.startsWith(">"))
.map((kv: (Int, String)) => kv._1)
.collect
.toList
.sortWith(_ < _)
.zipWithIndex
.reverse
.toMap
val descriptionLines: Map[Int, FastaDescriptionLine] = getDescriptionLines(filtered)
val indexToContigDescription = rdd.context.broadcast(descriptionLines)

val sequenceLines = filtered.filter(kv => !isDescriptionLine(kv._2))

val groupedContigs: RDD[(Int, Seq[(Int, String)])] = if (indices.size == 0) {
filtered.keyBy(kv => -1)
.groupByKey()
val keyedSequences = if (indexToContigDescription.value.size == 0) {
sequenceLines.keyBy(kv => -1)
} else {
filtered.keyBy((kv: (Int, String)) => indices.find(p => p._1 <= kv._1))
.filter((kv: (Option[(Int, Int)], (Int, String))) => kv._1.isDefined)
.map(kv => (kv._1.get._2, kv._2))
.groupByKey()
sequenceLines.keyBy(row => findContigIndex(row._1, indexToContigDescription.value.keys.toList))
}
val groupedContigs = keyedSequences.groupByKey()

val converter = new FastaConverter(maxFragmentLength)

val convertedData = groupedContigs.flatMap(kv => {
val (id, lines) = kv
val descriptionLine = indexToContigDescription.value.getOrElse(id, FastaDescriptionLine())

val descriptionLine = lines.filter(kv => kv._2.startsWith(">"))

val (name, comment): (Option[String], Option[String]) = if (descriptionLine.size == 0) {
assert(id == -1, "Cannot have a headerless line in a file with more than one fragment.")
(None, None)
} else if (descriptionLine.forall(kv => kv._2.contains(' '))) {
val description: String = descriptionLine.head._2
val splitIndex = description.indexOf(' ')
val split = description.splitAt(splitIndex)

val contigName: String = split._1.stripPrefix(">").trim
val contigDescription: String = split._2.trim
assert(lines.length != 0, "Sequence " + descriptionLine.seqId + " has no sequence data.")
val sequence: Seq[String] = lines.sortBy(kv => kv._1).map(kv => cleanSequence(kv._2))
converter.convert(descriptionLine.contigName, descriptionLine.seqId, sequence, descriptionLine.contigDescription)
})

(Some(contigName), Some(contigDescription))
} else {
(Some(descriptionLine.head._2.stripPrefix(">").trim), None)
}
convertedData
}

val seqId = if (id == -1) {
0
} else {
id
}
private def cleanSequence(sequence: String): String = {
sequence.stripSuffix("*")
}

val sequenceLines: Seq[(Int, String)] = lines.filter(kv => !kv._2.startsWith(">"))
assert(sequenceLines.length != 0, "Sequence " + seqId + " has no sequence data.")
private def isDescriptionLine(line: String): Boolean = {
line.startsWith(">")
}

val sequence: Seq[String] = sequenceLines.sortBy(kv => kv._1)
.map(kv => kv._2.stripSuffix("*"))
def getDescriptionLines(rdd: RDD[(Int, String)]): Map[Int, FastaDescriptionLine] = {

converter.convert(name, seqId, sequence, comment)
})
rdd.filter(kv => isDescriptionLine(kv._2))
.collect
.zipWithIndex
.map(kv => (kv._1._1, FastaDescriptionLine(kv._1._1, kv._2, Some(kv._1._2))))
.toMap
}

convertedData
def findContigIndex(rowIdx: Int, indices: List[Int]): Int = {
indices.filter(_ <= rowIdx).max
}
}

Expand All @@ -121,7 +134,7 @@ private[converters] class FastaConverter(fragmentLength: Long) extends Serializa
*/
def mapFragments(sequences: Seq[String]): Seq[String] = {
// internal "fsm" variables
var sequence: String = ""
var sequence: StringBuilder = new StringBuilder
var sequenceSeq: List[String] = List()

/**
Expand All @@ -132,10 +145,10 @@ private[converters] class FastaConverter(fragmentLength: Long) extends Serializa
* @param seq Fragment string to add.
*/
def addFragment(seq: String) {
sequence += seq
sequence.append(seq)

if (sequence.length > fragmentLength) {
sequenceSeq ::= sequence.take(fragmentLength.toInt)
sequenceSeq ::= sequence.take(fragmentLength.toInt).toString()
sequence = sequence.drop(fragmentLength.toInt)
}
}
Expand All @@ -145,7 +158,7 @@ private[converters] class FastaConverter(fragmentLength: Long) extends Serializa

// if we still have a remaining sequence that is not part of a fragment, add it
if (sequence.length != 0) {
sequenceSeq ::= sequence
sequenceSeq ::= sequence.toString()
}

// return our fragments
Expand Down
21 changes: 19 additions & 2 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.bdgenomics.adam.avro.{
ADAMRecord,
ADAMNucleotideContigFragment
}
import org.bdgenomics.adam.converters.SAMRecordConverter
import org.bdgenomics.adam.converters.{ FastaConverter, SAMRecordConverter }
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.models.ADAMRod
import org.bdgenomics.adam.projections.{
Expand All @@ -39,7 +39,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.{ Text, LongWritable }
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.rdd.RDD
Expand All @@ -52,6 +52,7 @@ import scala.Some
import scala.collection.JavaConversions._
import scala.collection.Map
import org.bdgenomics.adam.util.HadoopUtil
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

object ADAMContext {
// Add ADAM Spark context methods
Expand Down Expand Up @@ -299,6 +300,22 @@ class ADAMContext(sc: SparkContext) extends Serializable with Logging {
}
}

def adamSequenceLoad(filePath: String, fragmentLength: Long): RDD[ADAMNucleotideContigFragment] = {
if (filePath.endsWith(".fasta")) {
val fastaData: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(filePath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text])

val remapData = fastaData.map(kv => (kv._1.get.toInt, kv._2.toString))

log.info("Converting FASTA to ADAM.")
FastaConverter(remapData, fragmentLength)
} else {
adamParquetLoad(filePath)
}
}

/**
* Searches a path recursively, returning the names of all directories in the tree whose
* name matches the given regex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,29 @@
*/
package org.bdgenomics.adam.converters

import org.bdgenomics.adam.avro.{ ADAMNucleotideContigFragment, Base }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.SparkFunSuite
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite

class FastaConverterSuite extends SparkFunSuite {

val converter = new FastaConverter(1000)

sparkTest("find contig index") {
val headerLines = sc.parallelize(Seq(
(0, ">1 dna:chromosome chromosome:GRCh37:1:1:249250621:1"),
(252366306, ">2 dna:chromosome chromosome:GRCh37:2:1:243199373:1"),
(699103487, ">4 dna:chromosome chromosome:GRCh37:4:1:191154276:1"),
(892647244, ">5 dna:chromosome chromosome:GRCh37:5:1:180915260:1"),
(498605724, ">3 dna:chromosome chromosome:GRCh37:3:1:198022430:1")))
val descLines = FastaConverter.getDescriptionLines(headerLines)
val headerIndices: List[Int] = descLines.keys.toList

assert(0 === FastaConverter.findContigIndex(252366300, headerIndices))
assert(892647244 === FastaConverter.findContigIndex(892647249, headerIndices))
assert(252366306 === FastaConverter.findContigIndex(498605720, headerIndices))

}

test("convert a single record without naming information") {
val contig = converter.convert(None, 0, Seq("AAATTTGCGC"), None)

Expand Down