diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/SequenceDictionary.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/SequenceDictionary.scala index 1b0400c339..8553d58476 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/SequenceDictionary.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/SequenceDictionary.scala @@ -17,11 +17,10 @@ */ package org.bdgenomics.adam.models -import htsjdk.samtools.{ SAMFileHeader, SAMSequenceRecord, SAMSequenceDictionary } +import htsjdk.samtools.{ SAMFileHeader, SAMSequenceDictionary, SAMSequenceRecord } import htsjdk.variant.vcf.VCFHeader -import org.apache.avro.generic.IndexedRecord -import org.bdgenomics.formats.avro.{ AlignmentRecord, NucleotideContigFragment, Contig } -import scala.collection._ +import org.bdgenomics.formats.avro.{ Contig, NucleotideContigFragment } + import scala.collection.JavaConversions._ /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 9206cc7460..a0198a342f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,9 +18,9 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } -import java.util.regex.Pattern + +import htsjdk.samtools.util.Locatable import htsjdk.samtools.{ SAMFileHeader, ValidationStringency } -import htsjdk.samtools.util.{ Interval, Locatable } import htsjdk.variant.vcf.VCFHeader import org.apache.avro.Schema import org.apache.avro.file.DataFileStream @@ -40,12 +40,7 @@ import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ import org.bdgenomics.adam.models._ -import org.bdgenomics.adam.projections.{ - AlignmentRecordField, - FeatureField, - NucleotideContigFragmentField, - Projection -} +import org.bdgenomics.adam.projections.{ FeatureField, Projection } import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD import org.bdgenomics.adam.rdd.feature._ import org.bdgenomics.adam.rdd.fragment.FragmentRDD @@ -59,9 +54,8 @@ import org.bdgenomics.utils.io.LocalFileByteAccess import org.bdgenomics.utils.misc.{ HadoopUtil, Logging } import org.seqdoop.hadoop_bam._ import org.seqdoop.hadoop_bam.util._ + import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection.Map import scala.reflect.ClassTag /** @@ -97,7 +91,7 @@ object ADAMContext { implicit def sameTypeConversionFn[T, U <: GenomicRDD[T, U]](gRdd: U, rdd: RDD[T]): U = { // hijack the transform function to discard the old RDD - gRdd.transform(oldRdd => rdd) + gRdd.transform(_ => rdd) } implicit def readsToVCConversionFn(arRdd: AlignmentRecordRDD, @@ -143,8 +137,6 @@ private class FileFilter(private val name: String) extends PathFilter { } } -import org.bdgenomics.adam.rdd.ADAMContext._ - /** * The ADAMContext provides functions on top of a SparkContext for loading genomic data. * @@ -193,12 +185,13 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable private def loadSingleVcfMetadata(filePath: String): (SequenceDictionary, Seq[Sample]) = { def headerToMetadata(vcfHeader: VCFHeader): (SequenceDictionary, Seq[Sample]) = { val sd = SequenceDictionary.fromVCFHeader(vcfHeader) - val samples = asScalaBuffer(vcfHeader.getGenotypeSamples) - .map(s => { - Sample.newBuilder() - .setSampleId(s) - .build() - }).toSeq + val samples = + asScalaBuffer(vcfHeader.getGenotypeSamples) + .map(s => + Sample.newBuilder() + .setSampleId(s) + .build() + ) (sd, samples) } @@ -451,12 +444,11 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable val rg = loadBamReadGroups(samHeader) Some((sd, rg)) } catch { - case e: Throwable => { + case e: Throwable => log.error( s"Loading failed for $fp:n${e.getMessage}\n\t${e.getStackTrace.take(25).map(_.toString).mkString("\n\t")}" ) None - } } }).reduce((kv1, kv2) => { (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) @@ -471,7 +463,7 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable // contains bams, hadoop-bam is a-ok! i believe that it is better (perf) to // just load from a single newAPIHadoopFile call instead of a union across // files, so we do that whenever possible - val records = if (filteredFiles.size != bamFiles.size) { + val records = if (filteredFiles.length != bamFiles.length) { sc.union(filteredFiles.map(p => { sc.newAPIHadoopFile(p.toString, classOf[AnySAMInputFormat], classOf[LongWritable], classOf[SAMRecordWritable], ContextUtil.getConfiguration(job)) @@ -480,7 +472,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable sc.newAPIHadoopFile(filePath, classOf[AnySAMInputFormat], classOf[LongWritable], classOf[SAMRecordWritable], ContextUtil.getConfiguration(job)) } - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + val samRecordConverter = new SAMRecordConverter AlignedReadRDD(records.map(p => samRecordConverter.convert(p._2.get)), @@ -538,7 +533,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable sc.newAPIHadoopFile(p.toString, classOf[BAMInputFormat], classOf[LongWritable], classOf[SAMRecordWritable], conf) })) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + val samRecordConverter = new SAMRecordConverter AlignedReadRDD(records.map(p => samRecordConverter.convert(p._2.get)), seqDict, @@ -618,7 +616,6 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable // reverse list and return as seq list.reverse - .toSeq } /** @@ -675,7 +672,9 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable classOf[Text], ContextUtil.getConfiguration(job) ) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() // convert records val fastqRecordConverter = new FastqRecordConverter @@ -786,7 +785,9 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable classOf[Text], ContextUtil.getConfiguration(job) ) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() // convert records val fastqRecordConverter = new FastqRecordConverter @@ -845,7 +846,8 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable val records = readVcfRecords(filePath, None) // attach instrumentation - if (Metrics.isRecording) records.instrument() else records + if (Metrics.isRecording) + records.instrument() // load vcf metadata val (sd, samples) = loadVcfMetadata(filePath) @@ -882,7 +884,8 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable val records = readVcfRecords(filePath, Some(viewRegions)) // attach instrumentation - if (Metrics.isRecording) records.instrument() else records + if (Metrics.isRecording) + records.instrument() // load vcf metadata val (sd, samples) = loadVcfMetadata(filePath) @@ -952,7 +955,9 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable classOf[LongWritable], classOf[Text] ) - if (Metrics.isRecording) fastaData.instrument() else fastaData + + if (Metrics.isRecording) + fastaData.instrument() val remapData = fastaData.map(kv => (kv._1.get, kv._2.toString)) @@ -984,7 +989,9 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable classOf[Text], ContextUtil.getConfiguration(job) ) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() // convert records val fastqRecordConverter = new FastqRecordConverter @@ -1030,7 +1037,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new GFF3Parser().parse(_, stringency)) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + FeatureRDD(records) } @@ -1050,7 +1060,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new GTFParser().parse(_, stringency)) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + FeatureRDD(records) } @@ -1070,7 +1083,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new BEDParser().parse(_, stringency)) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + FeatureRDD(records) } @@ -1090,7 +1106,10 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable stringency: ValidationStringency = ValidationStringency.LENIENT): FeatureRDD = { val records = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) .flatMap(new NarrowPeakParser().parse(_, stringency)) - if (Metrics.isRecording) records.instrument() else records + + if (Metrics.isRecording) + records.instrument() + FeatureRDD(records) } @@ -1111,11 +1130,16 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable val parsedLines = sc.textFile(filePath, minPartitions.getOrElse(sc.defaultParallelism)) .map(new IntervalListParser().parseWithHeader(_, stringency)) - val (seqDict, records) = (SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*), - parsedLines.flatMap(_._2)) + + val seqDict = SequenceDictionary(parsedLines.flatMap(_._1).collect(): _*) + + val records = parsedLines.flatMap(_._2) + val seqDictMap = seqDict.records.map(sr => sr.name -> sr).toMap - if (Metrics.isRecording) records.instrument() else records + if (Metrics.isRecording) + records.instrument() + FeatureRDD(records, seqDict) }