From ab7ca71226b5ee2d46ade6d56c230575a02092b2 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Mon, 22 May 2017 01:44:05 -0700 Subject: [PATCH] [ADAM-1257] Add program record support for alignment/fragment files. Resolves #1257. --- .../adam/cli/TransformAlignments.scala | 42 +++++++++-- .../adam/cli/MergeShardsSuite.scala | 4 +- .../adam/cli/TransformAlignmentsSuite.scala | 10 +-- .../adam/models/ProgramRecord.scala | 4 +- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 72 ++++++++++++++++--- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 12 ++++ .../adam/rdd/fragment/FragmentRDD.scala | 10 +-- .../adam/rdd/read/AlignmentRecordRDD.scala | 30 +++++++- .../serialization/ADAMKryoRegistrator.scala | 3 + adam-core/src/test/resources/small.sam | 2 + .../adam/rdd/ADAMContextSuite.scala | 44 +++++++++++- .../adam/rdd/ParallelFileMergerSuite.scala | 2 +- .../rdd/read/AlignmentRecordRDDSuite.scala | 22 ++++++ 13 files changed, 225 insertions(+), 32 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala index eb4cfdf8d0..2cf5a632b2 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformAlignments.scala @@ -18,6 +18,7 @@ package org.bdgenomics.adam.cli import htsjdk.samtools.ValidationStringency +import java.time.Instant import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel @@ -29,6 +30,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin } import org.bdgenomics.adam.rich.RichVariant +import org.bdgenomics.formats.avro.ProcessingStep import org.bdgenomics.utils.cli._ import org.bdgenomics.utils.misc.Logging import org.kohsuke.args4j.{ Argument, Option => Args4jOption } @@ -38,7 +40,9 @@ object TransformAlignments extends BDGCommandCompanion { val commandDescription = "Convert SAM/BAM to ADAM format and optionally perform read pre-processing transformations" def apply(cmdLine: Array[String]) = { - new TransformAlignments(Args4j[TransformAlignmentsArgs](cmdLine)) + val args = Args4j[TransformAlignmentsArgs](cmdLine) + args.command = cmdLine.mkString(" ") + new TransformAlignments(args) } } @@ -123,6 +127,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu var cache: Boolean = false @Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.") var storageLevel: String = "MEMORY_ONLY" + @Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") + var disableProcessingStep = false + + var command: String = null } class TransformAlignments(protected val args: TransformAlignmentsArgs) extends BDGSparkCommand[TransformAlignmentsArgs] with Logging { @@ -427,7 +435,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B ) } - val aRdd: AlignmentRecordRDD = + val loadedRdd: AlignmentRecordRDD = if (args.forceLoadBam) { sc.loadBam(args.inputPath) } else if (args.forceLoadFastq) { @@ -461,9 +469,33 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B stringency = stringency ) } + + val aRdd: AlignmentRecordRDD = if (args.disableProcessingStep) { + loadedRdd + } else { + // add program info + val about = new About() + val version = if (about.isSnapshot()) { + "%s--%s".format(about.version(), about.commit()) + } else { + about.version() + } + val epoch = Instant.now.getEpochSecond + val processingStep = ProcessingStep.newBuilder + .setId("ADAM_%d".format(epoch)) + .setProgramName("org.bdgenomics.adam.cli.TransformAlignments") + .setVersion(version) + .setCommandLine(args.command) + .build + val newSteps: Seq[ProcessingStep] = (loadedRdd.processingSteps ++ Seq(processingStep)) + val rddWithProvenance: AlignmentRecordRDD = loadedRdd.copy(processingSteps = newSteps) + rddWithProvenance + } + val rdd = aRdd.rdd val sd = aRdd.sequences val rgd = aRdd.recordGroups + val pgs = aRdd.processingSteps // Optionally load a second RDD and concatenate it with the first. // Paired-FASTQ loading is avoided here because that wouldn't make sense @@ -484,12 +516,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B }) // if we have a second rdd that we are merging in, process the merger here - val (mergedRdd, mergedSd, mergedRgd) = concatOpt.fold((rdd, sd, rgd))(t => { - (rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups) + val (mergedRdd, mergedSd, mergedRgd, mergedPgs) = concatOpt.fold((rdd, sd, rgd, pgs))(t => { + (rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups, pgs ++ t.processingSteps) }) // make a new aligned read rdd, that merges the two RDDs together - val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd) + val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd, mergedPgs) // run our transformation val outputRdd = this.apply(newRdd) diff --git a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala index 36a45eb44d..2f4d2b817a 100644 --- a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala +++ b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/MergeShardsSuite.scala @@ -26,7 +26,8 @@ class MergeShardsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array("-single", "-defer_merging", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-defer_merging", "-disable_pg", + inputPath, actualPath)).run(sc) MergeShards(Array(actualPath + "_tail", actualPath, "-header_path", actualPath + "_head")).run(sc) checkFiles(expectedPath, actualPath) @@ -41,6 +42,7 @@ class MergeShardsSuite extends ADAMFunSuite { "-sort_lexicographically", "-defer_merging", inputPath, actualPath)).run(sc) + println(actualPath) MergeShards(Array(actualPath + "_tail", actualPath, "-header_path", actualPath + "_head")).run(sc) checkFiles(expectedPath, actualPath) diff --git a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala index 7c9156e1b8..e33bdab43d 100644 --- a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala +++ b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformAlignmentsSuite.scala @@ -25,7 +25,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array("-single", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", inputPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -33,7 +33,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val inputPath = copyResource("unordered.sam") val actualPath = tmpFile("ordered.sam") val expectedPath = copyResource("ordered.sam") - TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -42,8 +42,8 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val intermediateAdamPath = tmpFile("unordered.adam") val actualPath = tmpFile("unordered.sam") val expectedPath = inputPath - TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc) - TransformAlignments(Array("-single", intermediateAdamPath, actualPath)).run(sc) + TransformAlignments(Array("-disable_pg", inputPath, intermediateAdamPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", intermediateAdamPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } @@ -53,7 +53,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite { val actualPath = tmpFile("ordered.sam") val expectedPath = copyResource("ordered.sam") TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc) - TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc) + TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc) checkFiles(expectedPath, actualPath) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala index 15d1bc750b..482d079eeb 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/ProgramRecord.scala @@ -19,7 +19,7 @@ package org.bdgenomics.adam.models import htsjdk.samtools.SAMProgramRecord -private[models] object ProgramRecord { +private[adam] object ProgramRecord { /** * Builds a program record model from a SAM program record. @@ -51,7 +51,7 @@ private[models] object ProgramRecord { * @param previousID An optional ID for the ID of the previous stage that was * run. */ -private[models] case class ProgramRecord( +private[adam] case class ProgramRecord( id: String, commandLine: Option[String], name: Option[String], diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 8903699468..8cc30e2323 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -18,7 +18,7 @@ package org.bdgenomics.adam.rdd import java.io.{ File, FileNotFoundException, InputStream } -import htsjdk.samtools.{ SAMFileHeader, ValidationStringency } +import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency } import htsjdk.samtools.util.Locatable import htsjdk.variant.vcf.{ VCFHeader, @@ -70,6 +70,7 @@ import org.bdgenomics.formats.avro.{ Fragment, Genotype, NucleotideContigFragment, + ProcessingStep, RecordGroup => RecordGroupMetadata, Sample, Variant @@ -180,6 +181,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log RecordGroupDictionary.fromSAMHeader(samHeader) } + /** + * @param samHeader The header to extract processing lineage from. + * @return Returns the dictionary converted to an Avro model. + */ + private[rdd] def loadBamPrograms( + samHeader: SAMFileHeader): Seq[ProcessingStep] = { + val pgs = samHeader.getProgramRecords().toSeq + pgs.map(convertSAMProgramRecord(_)) + } + + /** + * Builds a program description from a htsjdk program record. + * + * @param record Program record to convert. + * @return Returns an Avro formatted program record. + */ + private[rdd] def convertSAMProgramRecord( + record: SAMProgramRecord): ProcessingStep = { + val builder = ProcessingStep.newBuilder + .setId(record.getId) + Option(record.getPreviousProgramGroupId).foreach(builder.setPreviousId(_)) + Option(record.getProgramVersion).foreach(builder.setVersion(_)) + Option(record.getProgramName).foreach(builder.setProgramName(_)) + Option(record.getCommandLine).foreach(builder.setCommandLine(_)) + builder.build + } + /** * @param pathName The path name to load VCF format metadata from. * Globs/directories are supported. @@ -298,6 +326,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log .distinct } + /** + * @param pathName The path name to load Avro processing steps from. + * Globs/directories are supported. + * @return Returns a seq of processing steps. + */ + private[rdd] def loadAvroPrograms(pathName: String): Seq[ProcessingStep] = { + getFsAndFilesWithFilter(pathName, new FileFilter("_processing.avro")) + .map(p => { + loadAvro[ProcessingStep](p.toString, ProcessingStep.SCHEMA$) + }).reduce(_ ++ _) + } + /** * @param pathName The path name to load Avro sequence dictionaries from. * Globs/directories are supported. @@ -596,7 +636,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log require(filteredFiles.nonEmpty, "Did not find any files at %s.".format(path)) - val (seqDict, readGroups) = + val (seqDict, readGroups, programs) = filteredFiles .flatMap(fp => { try { @@ -608,7 +648,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log log.info("Loaded header from " + fp) val sd = loadBamDictionary(samHeader) val rg = loadBamReadGroups(samHeader) - Some((sd, rg)) + val pgs = loadBamPrograms(samHeader) + Some((sd, rg, pgs)) } catch { case e: Throwable => { if (validationStringency == ValidationStringency.STRICT) { @@ -622,7 +663,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } } }).reduce((kv1, kv2) => { - (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) + (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3) }) val job = HadoopUtil.newJob(sc) @@ -648,7 +689,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)), seqDict, - readGroups) + readGroups, + programs) } /** @@ -689,7 +731,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log require(bamFiles.nonEmpty, "Did not find any files at %s.".format(path)) - val (seqDict, readGroups) = bamFiles + val (seqDict, readGroups, programs) = bamFiles .map(fp => { // We need to separately read the header, so that we can inject the sequence dictionary // data into each individual Read (see the argument to samRecordConverter.convert, @@ -699,10 +741,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log log.info("Loaded header from " + fp) val sd = loadBamDictionary(samHeader) val rg = loadBamReadGroups(samHeader) + val pgs = loadBamPrograms(samHeader) - (sd, rg) + (sd, rg, pgs) }).reduce((kv1, kv2) => { - (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2) + (kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3) }) val job = HadoopUtil.newJob(sc) @@ -717,7 +760,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val samRecordConverter = new SAMRecordConverter AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)), seqDict, - readGroups) + readGroups, + programs) } /** @@ -829,7 +873,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) - AlignmentRecordRDD(rdd, sd, rgd) + // load processing step descriptions + val pgs = loadAvroPrograms(pathName) + + AlignmentRecordRDD(rdd, sd, rgd, pgs) } /** @@ -1482,10 +1529,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert avro to sequence dictionary val rgd = loadAvroRecordGroupDictionary(pathName) + // load processing step descriptions + val pgs = loadAvroPrograms(pathName) + // load fragment data from parquet val rdd = loadParquet[Fragment](pathName, optPredicate, optProjection) - FragmentRDD(rdd, sd, rgd) + FragmentRDD(rdd, sd, rgd, pgs) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index e2a9d05e57..58c7633900 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -35,6 +35,7 @@ import org.bdgenomics.adam.models.{ } import org.bdgenomics.formats.avro.{ Contig, + ProcessingStep, RecordGroup => RecordGroupMetadata, Sample } @@ -984,6 +985,11 @@ trait MultisampleGenomicRDD[T, U <: MultisampleGenomicRDD[T, U]] extends Genomic */ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroReadGroupGenomicRDD[T, U]] extends AvroGenomicRDD[T, U] { + /** + * The processing steps that have been applied to this GenomicRDD. + */ + val processingSteps: Seq[ProcessingStep] + /** * A dictionary describing the read groups attached to this GenomicRDD. */ @@ -998,6 +1004,12 @@ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroRe Contig.SCHEMA$, contigs) + // save processing metadata + saveAvro("%s/_processing.avro".format(filePath), + rdd.context, + ProcessingStep.SCHEMA$, + processingSteps) + // convert record group to avro and save val rgMetadata = recordGroups.recordGroups .map(_.toMetadata) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index 8a49396463..5b03933fa4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -105,9 +105,11 @@ object FragmentRDD { * @param sequences The genomic sequences this data was aligned to, if any. * @param recordGroups The record groups these Fragments came from. */ -case class FragmentRDD(rdd: RDD[Fragment], - sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] { +case class FragmentRDD( + rdd: RDD[Fragment], + sequences: SequenceDictionary, + recordGroups: RecordGroupDictionary, + @transient processingSteps: Seq[ProcessingStep] = Seq.empty) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] { protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])( implicit tTag: ClassTag[Fragment]): IntervalArray[ReferenceRegion, Fragment] = { @@ -144,7 +146,7 @@ case class FragmentRDD(rdd: RDD[Fragment], val newRdd = rdd.flatMap(converter.convertFragment) // are we aligned? - AlignmentRecordRDD(newRdd, sequences, recordGroups) + AlignmentRecordRDD(newRdd, sequences, recordGroups, processingSteps) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index 2bfc61bf50..a69668c231 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -122,12 +122,31 @@ object AlignmentRecordRDD { }) } } + + /** + * Converts a processing step back to the SAM representation. + * + * @param ps The processing step to convert. + * @return Returns an HTSJDK program group. + */ + private[read] def processingStepToSam( + ps: ProcessingStep): SAMProgramRecord = { + require(ps.getId != null, + "Processing stage ID cannot be null (%s).".format(ps)) + val pg = new SAMProgramRecord(ps.getId) + Option(ps.getPreviousId).foreach(pg.setPreviousProgramGroupId(_)) + Option(ps.getProgramName).foreach(pg.setProgramName) + Option(ps.getVersion).foreach(pg.setProgramVersion) + Option(ps.getCommandLine).foreach(pg.setCommandLine) + pg + } } case class AlignmentRecordRDD( rdd: RDD[AlignmentRecord], sequences: SequenceDictionary, - recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[AlignmentRecord, AlignmentRecordRDD] { + recordGroups: RecordGroupDictionary, + @transient processingSteps: Seq[ProcessingStep] = Seq.empty) extends AvroReadGroupGenomicRDD[AlignmentRecord, AlignmentRecordRDD] { /** * Replaces the underlying RDD and SequenceDictionary and emits a new object. @@ -168,7 +187,8 @@ case class AlignmentRecordRDD( def toFragments: FragmentRDD = { FragmentRDD(groupReadsByFragment().map(_.toFragment), sequences, - recordGroups) + recordGroups, + processingSteps) } /** @@ -367,6 +387,12 @@ case class AlignmentRecordRDD( header.setSortOrder(SAMFileHeader.SortOrder.unsorted) } + // get program records and attach to header + val pgRecords = processingSteps.map(r => { + AlignmentRecordRDD.processingStepToSam(r) + }) + header.setProgramRecords(pgRecords) + // broadcast for efficiency val hdrBcast = rdd.context.broadcast(SAMFileHeaderWritable(header)) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala index 1fb1a178ea..fb0f4dbed2 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala @@ -164,6 +164,7 @@ class ADAMKryoRegistrator extends KryoRegistrator { kryo.register(classOf[org.bdgenomics.adam.models.MdTag]) kryo.register(classOf[org.bdgenomics.adam.models.MultiContigNonoverlappingRegions]) kryo.register(classOf[org.bdgenomics.adam.models.NonoverlappingRegions]) + kryo.register(classOf[org.bdgenomics.adam.models.ProgramRecord]) kryo.register(classOf[org.bdgenomics.adam.models.RecordGroup]) kryo.register(classOf[org.bdgenomics.adam.models.RecordGroupDictionary]) kryo.register(classOf[org.bdgenomics.adam.models.ReferencePosition], @@ -251,6 +252,8 @@ class ADAMKryoRegistrator extends KryoRegistrator { new AvroSerializer[org.bdgenomics.formats.avro.NucleotideContigFragment]) kryo.register(classOf[org.bdgenomics.formats.avro.OntologyTerm], new AvroSerializer[org.bdgenomics.formats.avro.OntologyTerm]) + kryo.register(classOf[org.bdgenomics.formats.avro.ProcessingStep], + new AvroSerializer[org.bdgenomics.formats.avro.ProcessingStep]) kryo.register(classOf[org.bdgenomics.formats.avro.Read], new AvroSerializer[org.bdgenomics.formats.avro.Read]) kryo.register(classOf[org.bdgenomics.formats.avro.RecordGroup], diff --git a/adam-core/src/test/resources/small.sam b/adam-core/src/test/resources/small.sam index 7ed3d8de21..d29f8454a4 100644 --- a/adam-core/src/test/resources/small.sam +++ b/adam-core/src/test/resources/small.sam @@ -1,5 +1,7 @@ @SQ SN:1 LN:249250621 @SQ SN:2 LN:243199373 +@PG ID:p1 PN:myProg CL:"myProg 123" VN:1.0.0 +@PG ID:p2 PN:myProg CL:"myProg 456" VN:1.0.0 PP:p1 simread:1:26472783:false 16 1 26472784 60 75M * 0 0 GTATAAGAGCAGCCTTATTCCTATTTATAATCAGGGTGAAACACCTGTGCCAATGCCAAGACAGGGGTGCCAAGA * NM:i:0 AS:i:75 XS:i:0 simread:1:240997787:true 0 1 240997788 60 75M * 0 0 CTTTATTTTTATTTTTAAGGTTTTTTTTGTTTGTTTGTTTTGAGATGGAGTCTCGCTCCACCGCCCAGACTGGAG * NM:i:0 AS:i:75 XS:i:39 simread:1:189606653:true 0 1 189606654 60 75M * 0 0 TGTATCTTCCTCCCCTGCTGTATGTTTCCTGCCCTCAAACATCACACTCCACGTTCTTCAGCTTTAGGACTTGGA * NM:i:0 AS:i:75 XS:i:0 diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index f96fc17b19..ed096a4fc2 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -17,7 +17,11 @@ */ package org.bdgenomics.adam.rdd -import htsjdk.samtools.{ SAMFormatException, ValidationStringency } +import htsjdk.samtools.{ + SAMFormatException, + SAMProgramRecord, + ValidationStringency +} import java.io.{ File, FileNotFoundException } import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -32,6 +36,7 @@ import org.bdgenomics.adam.util.PhredUtils._ import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro._ import org.seqdoop.hadoop_bam.CRAMInputFormat +import org.seqdoop.hadoop_bam.util.SAMHeaderReader case class TestSaveArgs(var outputPath: String) extends ADAMSaveAnyArgs { var sortFastqOutput = false @@ -538,4 +543,41 @@ class ADAMContextSuite extends ADAMFunSuite { val reads = fragments.toReads assert(reads.rdd.count === 6) } + + sparkTest("convert program record") { + val pr = new SAMProgramRecord("pgId") + pr.setPreviousProgramGroupId("ppgId") + pr.setProgramName("myProg") + pr.setProgramVersion("1.2.3") + pr.setCommandLine("myProg aCommand") + val ps = sc.convertSAMProgramRecord(pr) + println(ps) + assert(ps.getId === "pgId") + assert(ps.getProgramName === "myProg") + assert(ps.getVersion === "1.2.3") + assert(ps.getCommandLine === "myProg aCommand") + assert(ps.getPreviousId === "ppgId") + } + + sparkTest("load program record from sam file") { + val input = testFile("small.sam") + val samHeader = SAMHeaderReader.readSAMHeaderFrom(new Path(input), + sc.hadoopConfiguration) + val programs = sc.loadBamPrograms(samHeader) + assert(programs.size === 2) + val firstPg = programs.filter(_.getPreviousId == null) + println(firstPg) + assert(firstPg.size === 1) + assert(firstPg.head.getId === "p1") + assert(firstPg.head.getProgramName === "myProg") + assert(firstPg.head.getCommandLine === "\"myProg 123\"") + assert(firstPg.head.getVersion === "1.0.0") + val secondPg = programs.filter(_.getPreviousId != null) + assert(secondPg.size === 1) + assert(secondPg.head.getId === "p2") + assert(secondPg.head.getPreviousId === "p1") + assert(secondPg.head.getProgramName === "myProg") + assert(secondPg.head.getCommandLine === "\"myProg 456\"") + assert(secondPg.head.getVersion === "1.0.0") + } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala index bf95adf921..80099bc6e6 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ParallelFileMergerSuite.scala @@ -49,7 +49,7 @@ class ParallelFileMergerSuite extends ADAMFunSuite { val files = Seq(testFile("unmapped.sam"), testFile("small.sam")) .map(new Path(_)) - val fileSizes = Seq(29408, 3093) + val fileSizes = Seq(29408, 3189) val filesWithSizes = files.zip(fileSizes) val fs = FileSystem.get(sc.hadoopConfiguration) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala index 1622e8ac60..875b898d73 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala @@ -978,4 +978,26 @@ class AlignmentRecordRDDSuite extends ADAMFunSuite { // small.sam has no record groups assert(union.recordGroups.size === reads1.recordGroups.size) } + + test("cannot have a null processing step ID") { + intercept[IllegalArgumentException] { + AlignmentRecordRDD.processingStepToSam(ProcessingStep.newBuilder.build) + } + } + + test("convert a processing description to htsjdk") { + val htsjdkPg = AlignmentRecordRDD.processingStepToSam( + ProcessingStep.newBuilder() + .setId("pg") + .setProgramName("myProgram") + .setVersion("1") + .setPreviousId("ppg") + .setCommandLine("myProgram run") + .build) + assert(htsjdkPg.getId === "pg") + assert(htsjdkPg.getCommandLine === "myProgram run") + assert(htsjdkPg.getProgramName === "myProgram") + assert(htsjdkPg.getProgramVersion === "1") + assert(htsjdkPg.getPreviousProgramGroupId === "ppg") + } }