diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala index fb66c4b4f0..d1d1f2ca2c 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Coverage.scala @@ -55,6 +55,8 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs { var onlyNegativeStrands: Boolean = false @Args4jOption(required = false, name = "-only_positive_strands", usage = "Compute coverage for positive strands") var onlyPositiveStrands: Boolean = false + @Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file") + var asSingleFile: Boolean = false } class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] { @@ -80,7 +82,7 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom } finalReads.toCoverage(args.collapse) - .save(args.outputPath) + .save(args.outputPath, asSingleFile = args.asSingleFile) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/CoverageRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/CoverageRDD.scala index 6d1e4b5fd7..2a339c16ea 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/CoverageRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/features/CoverageRDD.scala @@ -52,8 +52,8 @@ case class CoverageRDD(rdd: RDD[Coverage], * * @param filePath The location to write the output. */ - def save(filePath: java.lang.String) = { - this.toFeatureRDD.save(filePath) + def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) = { + this.toFeatureRDD.save(filePath, asSingleFile = asSingleFile) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala index 9c27d3ac95..1483046fa9 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala @@ -28,49 +28,26 @@ import org.seqdoop.hadoop_bam.{ SAMRecordWritable } -object ADAMBAMOutputFormat extends Serializable { +class ADAMBAMOutputFormat[K] + extends KeyIgnoringBAMOutputFormat[K] with Serializable { - private[read] var header: Option[SAMFileHeader] = None + setWriteHeader(true) - /** - * Attaches a header to the ADAMSAMOutputFormat Hadoop writer. If a header has previously - * been attached, the header must be cleared first. - * - * @throws Exception Exception thrown if a SAM header has previously been attached, and not cleared. - * - * @param samHeader Header to attach. - * - * @see clearHeader - */ - def addHeader(samHeader: SAMFileHeader) { - assert(header.isEmpty, "Cannot attach a new SAM header without first clearing the header.") - header = Some(samHeader) - } - - /** - * Clears the attached header. - * - * @see addHeader - */ - def clearHeader() { - header = None - } + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = context.getConfiguration() - /** - * Returns the current header. - * - * @return Current SAM header. - */ - private[read] def getHeader: SAMFileHeader = { - assert(header.isDefined, "Cannot return header if not attached.") - header.get - } -} + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) -class ADAMBAMOutputFormat[K] - extends KeyIgnoringBAMOutputFormat[K] with Serializable { + // read the header file + readSAMHeaderFrom(path, conf) - setSAMHeader(ADAMBAMOutputFormat.getHeader) + // now that we have the header set, we need to make a record reader + return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""), + header, + true, + context) + } } class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala index e9b334610d..b035f739f2 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala @@ -29,49 +29,26 @@ import org.seqdoop.hadoop_bam.{ SAMRecordWritable } -object ADAMSAMOutputFormat extends Serializable { +class ADAMSAMOutputFormat[K] + extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable { - private[read] var header: Option[SAMFileHeader] = None + setWriteHeader(true) - /** - * Attaches a header to the ADAMSAMOutputFormat Hadoop writer. If a header has previously - * been attached, the header must be cleared first. - * - * @throws Exception Exception thrown if a SAM header has previously been attached, and not cleared. - * - * @param samHeader Header to attach. - * - * @see clearHeader - */ - def addHeader(samHeader: SAMFileHeader) { - assert(header.isEmpty, "Cannot attach a new SAM header without first clearing the header.") - header = Some(samHeader) - } - - /** - * Clears the attached header. - * - * @see addHeader - */ - def clearHeader() { - header = None - } + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = context.getConfiguration() - /** - * Returns the current header. - * - * @return Current SAM header. - */ - private[read] def getHeader: SAMFileHeader = { - assert(header.isDefined, "Cannot return header if not attached.") - header.get - } -} + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) -class ADAMSAMOutputFormat[K] - extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable { + // read the header file + readSAMHeaderFrom(path, conf) - setSAMHeader(ADAMSAMOutputFormat.getHeader) + // now that we have the header set, we need to make a record reader + return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""), + header, + true, + context) + } } class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index cd7b18212b..7b7c13253a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -315,112 +315,77 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord, // write file to disk val conf = rdd.context.hadoopConfiguration - if (!asSingleFile) { - val bcastHeader = rdd.context.broadcast(header) - val mp = rdd.mapPartitionsWithIndex((idx, iter) => { - log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx") - val header = bcastHeader.value - synchronized { - // perform map partition call to ensure that the SAM/BAM header is set on all - // nodes in the cluster; see: - // https://github.com/bigdatagenomics/adam/issues/353, - // https://github.com/bigdatagenomics/adam/issues/676 - - asSam match { - case true => - ADAMSAMOutputFormat.clearHeader() - ADAMSAMOutputFormat.addHeader(header) - log.info(s"Set SAM header for partition $idx") - case false => - ADAMBAMOutputFormat.clearHeader() - ADAMBAMOutputFormat.addHeader(header) - log.info(s"Set BAM header for partition $idx") - } - } - Iterator[Int]() - }).count() + // get file system + val headPath = new Path(filePath + "_head") + val tailPath = new Path(filePath + "_tail") + val outputPath = new Path(filePath) + val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration) + + // TIL: sam and bam are written in completely different ways! + if (asSam) { + SAMHeaderWriter.writeHeader(fs, headPath, header) + } else { - // force value check, ensure that computation happens - if (mp != 0) { - log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.") - } + // get an output stream + val os = fs.create(headPath) + .asInstanceOf[OutputStream] + + // create htsjdk specific streams for writing the bam header + val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null) + val binaryCodec = new BinaryCodec(compressedOut); + + // write a bam header - cribbed from Hadoop-BAM + binaryCodec.writeBytes("BAM\001".getBytes()) + val sw: Writer = new StringWriter() + new SAMTextHeaderCodec().encode(sw, header) + binaryCodec.writeString(sw.toString, true, false) + + // write sequence dictionary + val ssd = header.getSequenceDictionary + binaryCodec.writeInt(ssd.size()) + ssd.getSequences + .toList + .foreach(r => { + binaryCodec.writeString(r.getSequenceName(), true, true) + binaryCodec.writeInt(r.getSequenceLength()) + }) + + // flush and close all the streams + compressedOut.flush() + compressedOut.close() + os.flush() + os.close() + } - // attach header to output format - asSam match { - case true => - ADAMSAMOutputFormat.clearHeader() - ADAMSAMOutputFormat.addHeader(header) - log.info("Set SAM header on driver") - case false => - ADAMBAMOutputFormat.clearHeader() - ADAMBAMOutputFormat.addHeader(header) - log.info("Set BAM header on driver") - } + // set path to header file + conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString) - asSam match { - case true => - withKey.saveAsNewAPIHadoopFile( - filePath, - classOf[LongWritable], - classOf[SAMRecordWritable], - classOf[InstrumentedADAMSAMOutputFormat[LongWritable]], - conf - ) - case false => - withKey.saveAsNewAPIHadoopFile( - filePath, - classOf[LongWritable], - classOf[SAMRecordWritable], - classOf[InstrumentedADAMBAMOutputFormat[LongWritable]], - conf - ) + if (!asSingleFile) { + if (asSam) { + withKey.saveAsNewAPIHadoopFile( + filePath, + classOf[LongWritable], + classOf[SAMRecordWritable], + classOf[InstrumentedADAMSAMOutputFormat[LongWritable]], + conf + ) + } else { + withKey.saveAsNewAPIHadoopFile( + filePath, + classOf[LongWritable], + classOf[SAMRecordWritable], + classOf[InstrumentedADAMBAMOutputFormat[LongWritable]], + conf + ) } + + // clean up the header after writing + fs.delete(headPath, true) } else { log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)") - val headPath = new Path(filePath + "_head") val tailPath = new Path(filePath + "_tail") val outputPath = new Path(filePath) - val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration) - - // TIL: sam and bam are written in completely different ways! - if (asSam) { - SAMHeaderWriter.writeHeader(fs, headPath, header) - } else { - - // get an output stream - val os = fs.create(headPath) - .asInstanceOf[OutputStream] - - // create htsjdk specific streams for writing the bam header - val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null) - val binaryCodec = new BinaryCodec(compressedOut); - - // write a bam header - cribbed from Hadoop-BAM - binaryCodec.writeBytes("BAM\001".getBytes()) - val sw: Writer = new StringWriter() - new SAMTextHeaderCodec().encode(sw, header) - binaryCodec.writeString(sw.toString, true, false) - - // write sequence dictionary - val ssd = header.getSequenceDictionary - binaryCodec.writeInt(ssd.size()) - ssd.getSequences - .toList - .foreach(r => { - binaryCodec.writeString(r.getSequenceName(), true, true) - binaryCodec.writeInt(r.getSequenceLength()) - }) - - // flush and close all the streams - compressedOut.flush() - compressedOut.close() - os.flush() - os.close() - } - - // set path to header file - conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString) // set up output format val headerLessOutputFormat = if (asSam) { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVCFOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVCFOutputFormat.scala index e241d6999a..c80c7d209f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVCFOutputFormat.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVCFOutputFormat.scala @@ -18,43 +18,61 @@ package org.bdgenomics.adam.rdd.variation import htsjdk.variant.vcf.{ VCFHeaderLine, VCFHeader } +import org.apache.hadoop.fs.{ FileSystem, Path } +import org.apache.hadoop.mapreduce.{ RecordWriter, TaskAttemptContext } import org.bdgenomics.adam.converters.SupportedHeaderLines import org.bdgenomics.adam.models.SequenceDictionary -import org.seqdoop.hadoop_bam.{ VCFFormat, KeyIgnoringVCFOutputFormat } -import scala.collection.JavaConversions._ +import org.seqdoop.hadoop_bam.{ + KeyIgnoringVCFOutputFormat, + KeyIgnoringVCFRecordWriter, + VariantContextWritable, + VCFFormat +} -object ADAMVCFOutputFormat extends Serializable { - private var header: Option[VCFHeader] = None +/** + * Wrapper for Hadoop-BAM to work around requirement for no-args constructor. + * + * @tparam K + */ +class ADAMVCFOutputFormat[K] extends KeyIgnoringVCFOutputFormat[K](VCFFormat.VCF) with Serializable { - def getHeader: VCFHeader = header match { - case Some(h) => h - case None => setHeader(Seq(), SequenceDictionary.empty) - } + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, VariantContextWritable] = { + val conf = context.getConfiguration() - def clearHeader(): Unit = { - header = None - } + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.variation.vcf_header_path")) - def setHeader(samples: Seq[String], - sequences: SequenceDictionary): VCFHeader = { - header = Some({ - val hdr = new VCFHeader( - (SupportedHeaderLines.infoHeaderLines ++ SupportedHeaderLines.formatHeaderLines).toSet: Set[VCFHeaderLine], - samples - ) - hdr.setSequenceDictionary(sequences.toSAMSequenceDictionary) - hdr - }) - header.get + // read the header file + readHeaderFrom(path, FileSystem.get(conf)) + + // return record writer + return new KeyIgnoringVCFRecordWriter[K](getDefaultWorkFile(context, ""), + header, + true, + context); } } /** - * Wrapper for Hadoop-BAM to work around requirement for no-args constructor. Depends on - * ADAMVCFOutputFormat object to maintain global state (such as samples) + * Wrapper for Hadoop-BAM to work around requirement for no-args constructor. * * @tparam K */ -class ADAMVCFOutputFormat[K] extends KeyIgnoringVCFOutputFormat[K](VCFFormat.VCF) with Serializable { - setHeader(ADAMVCFOutputFormat.getHeader) +class ADAMHeaderlessVCFOutputFormat[K] extends KeyIgnoringVCFOutputFormat[K](VCFFormat.VCF) with Serializable { + + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, VariantContextWritable] = { + val conf = context.getConfiguration() + + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.variation.vcf_header_path")) + + // read the header file + readHeaderFrom(path, FileSystem.get(conf)) + + // return record writer + return new KeyIgnoringVCFRecordWriter[K](getDefaultWorkFile(context, ""), + header, + false, + context); + } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDD.scala index 8e8467bb56..0a5d63fa99 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDD.scala @@ -17,11 +17,20 @@ */ package org.bdgenomics.adam.rdd.variation +import htsjdk.variant.variantcontext.writer.{ + Options, + VariantContextWriterBuilder +} +import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine } +import java.io.OutputStream import org.apache.hadoop.io.LongWritable import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.converters.VariantContextConverter +import org.bdgenomics.adam.converters.{ + SupportedHeaderLines, + VariantContextConverter +} import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary, @@ -31,6 +40,7 @@ import org.bdgenomics.adam.rdd.{ FileMerger, MultisampleGenomicRDD } import org.bdgenomics.formats.avro.Sample import org.bdgenomics.utils.cli.SaveArgs import org.seqdoop.hadoop_bam._ +import scala.collection.JavaConversions._ /** * An RDD containing VariantContexts attached to a reference and samples. @@ -108,37 +118,12 @@ case class VariantContextRDD(rdd: RDD[VariantContext], val vcfFormat = VCFFormat.inferFromFilePath(filePath) assert(vcfFormat == VCFFormat.VCF, "BCF not yet supported") // TODO: Add BCF support - rdd.cache() log.info(s"Writing $vcfFormat file to $filePath") // map samples to sample ids val sampleIds = samples.map(_.getSampleId) - // Initialize global header object required by Hadoop VCF Writer - val bcastHeader = rdd.context.broadcast(sampleIds) - val mp = rdd.mapPartitionsWithIndex((idx, iter) => { - log.info(s"Setting header for partition $idx") - synchronized { - // perform map partition call to ensure that the VCF header is set on all - // nodes in the cluster; see: - // https://github.com/bigdatagenomics/adam/issues/353, - // https://github.com/bigdatagenomics/adam/issues/676 - ADAMVCFOutputFormat.clearHeader() - ADAMVCFOutputFormat.setHeader(bcastHeader.value, sequences) - log.info(s"Set VCF header for partition $idx") - } - Iterator[Int]() - }).count() - - // force value check, ensure that computation happens - if (mp != 0) { - log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.") - } - - ADAMVCFOutputFormat.clearHeader() - ADAMVCFOutputFormat.setHeader(bcastHeader.value, sequences) - log.info("Set VCF header on driver") - + // sort if needed val keyByPosition = rdd.keyBy(_.position) val maybeSortedByKey = if (sortOnSave) { keyByPosition.sortByKey() @@ -154,34 +139,75 @@ case class VariantContextRDD(rdd: RDD[VariantContext], (new LongWritable(kv._1.pos), vcw) }) - // save to disk + // configure things for saving to disk val conf = rdd.context.hadoopConfiguration + val fs = FileSystem.get(conf) conf.set(VCFOutputFormat.OUTPUT_VCF_FORMAT_PROPERTY, vcfFormat.toString) + // make header + val headerLines: Set[VCFHeaderLine] = (SupportedHeaderLines.infoHeaderLines ++ + SupportedHeaderLines.formatHeaderLines).toSet + val header = new VCFHeader( + headerLines, + samples.map(_.getSampleId)) + header.setSequenceDictionary(sequences.toSAMSequenceDictionary) + + // write header + val headPath = new Path("%s_head".format(filePath)) + + // get an output stream + val os = fs.create(headPath) + .asInstanceOf[OutputStream] + + // build a vcw + val vcw = new VariantContextWriterBuilder() + .setOutputVCFStream(os) + .clearIndexCreator() + .unsetOption(Options.INDEX_ON_THE_FLY) + .build() + + // write the header + vcw.writeHeader(header) + + // close the writer and the underlying stream + vcw.close() + os.flush() + os.close() + + // set path to header file + conf.set("org.bdgenomics.adam.rdd.variation.vcf_header_path", headPath.toString) + if (asSingleFile) { // write shards to disk val tailPath = "%s_tail".format(filePath) writableVCs.saveAsNewAPIHadoopFile( tailPath, - classOf[LongWritable], classOf[VariantContextWritable], classOf[ADAMVCFOutputFormat[LongWritable]], + classOf[LongWritable], + classOf[VariantContextWritable], + classOf[ADAMHeaderlessVCFOutputFormat[LongWritable]], conf ) // merge shards - FileMerger.mergeFiles(FileSystem.get(conf), + FileMerger.mergeFiles(fs, new Path(filePath), - new Path(tailPath)) + new Path(tailPath), + Some(headPath)) } else { + + // write shards writableVCs.saveAsNewAPIHadoopFile( filePath, - classOf[LongWritable], classOf[VariantContextWritable], classOf[ADAMVCFOutputFormat[LongWritable]], + classOf[LongWritable], + classOf[VariantContextWritable], + classOf[ADAMVCFOutputFormat[LongWritable]], conf ) - } - log.info("Write %d records".format(writableVCs.count())) - rdd.unpersist() + // remove header file + //fs.delete(headPath, true) + } } /** diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/features/CoverageRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/features/CoverageRDDSuite.scala index 70a264da7e..512cdea7f8 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/features/CoverageRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/features/CoverageRDDSuite.scala @@ -48,7 +48,7 @@ class CoverageRDDSuite extends ADAMFunSuite { val coverageRDD: CoverageRDD = featureRDD.toCoverage val outputFile = tmpLocation(".bed") - coverageRDD.save(outputFile) + coverageRDD.save(outputFile, false) val coverage = sc.loadCoverage(outputFile) assert(coverage.rdd.count == 3) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDDSuite.scala index 10e04845f7..6f0d302668 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/VariantContextRDDSuite.scala @@ -66,6 +66,16 @@ class VariantContextRDDSuite extends ADAMFunSuite { assert(vcRdd.sequences.records(0).name === "chr11") } + sparkTest("can write as a single file, then read in .vcf file") { + val path = new File(tempDir, "test_single.vcf") + variants.saveAsVcf(path.getAbsolutePath, sortOnSave = false, asSingleFile = true) + assert(path.exists) + val vcRdd = sc.loadVcf("%s/test_single.vcf".format(tempDir)) + assert(vcRdd.rdd.count === 1) + assert(vcRdd.sequences.records.size === 1) + assert(vcRdd.sequences.records(0).name === "chr11") + } + sparkTest("joins SNV database annotation") { val v0 = Variant.newBuilder .setContigName("11")