From 7d4b40932275ebf4146fb44d0e03acb3308e0cd9 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Tue, 12 Jan 2016 10:50:07 -0800 Subject: [PATCH] [ADAM-916] New strategy for writing header. Resolves #916. Makes several modifications that should eliminate the header attach issue when writing back to SAM/BAM: * Writes the SAM/BAM header as a single file. * Instead of trying to attach the SAM/BAM header to the output format via a singleton object, we pass the path to the SAM/BAM header file via the Hadoop configuration. * The output format reads the header from HDFS when creating the record writer. * At the end, once we've written the full RDD and the header file, we merge all via Hadoop's FsUtil. --- .../adam/rdd/read/ADAMBAMOutputFormat.scala | 29 +- .../adam/rdd/read/ADAMSAMOutputFormat.scala | 27 +- .../read/AlignmentRecordRDDFunctions.scala | 322 +++++++++++++----- .../AlignmentRecordRDDFunctionsSuite.scala | 14 +- 4 files changed, 303 insertions(+), 89 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala index 08dff03944..e69ef1e48c 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMBAMOutputFormat.scala @@ -17,11 +17,17 @@ */ package org.bdgenomics.adam.rdd.read -import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringBAMOutputFormat } import htsjdk.samtools.SAMFileHeader +import hbparquet.hadoop.util.ContextUtil +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext } import org.apache.spark.rdd.InstrumentedOutputFormat -import org.apache.hadoop.mapreduce.OutputFormat import org.bdgenomics.adam.instrumentation.Timers +import org.seqdoop.hadoop_bam.{ + KeyIgnoringBAMOutputFormat, + KeyIgnoringBAMRecordWriter, + SAMRecordWritable +} object ADAMBAMOutputFormat extends Serializable { @@ -76,11 +82,26 @@ class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org class ADAMBAMOutputFormatHeaderLess[K] extends KeyIgnoringBAMOutputFormat[K] with Serializable { - setSAMHeader(ADAMBAMOutputFormat.getHeader) setWriteHeader(false) + + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = ContextUtil.getConfiguration(context) + + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) + + // read the header file + readSAMHeaderFrom(path, conf) + + // now that we have the header set, we need to make a record reader + return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""), + header, + false, + context) + } } class InstrumentedADAMBAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { override def timerName(): String = Timers.WriteBAMRecord.timerName override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormatHeaderLess[K]] -} \ No newline at end of file +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala index cea18a9d0b..cb26f7e663 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/ADAMSAMOutputFormat.scala @@ -17,11 +17,18 @@ */ package org.bdgenomics.adam.rdd.read -import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringAnySAMOutputFormat, SAMFormat } import htsjdk.samtools.SAMFileHeader +import hbparquet.hadoop.util.ContextUtil +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext } import org.apache.spark.rdd.InstrumentedOutputFormat import org.bdgenomics.adam.instrumentation.Timers -import org.apache.hadoop.mapreduce.OutputFormat +import org.seqdoop.hadoop_bam.{ + KeyIgnoringAnySAMOutputFormat, + KeyIgnoringSAMRecordWriter, + SAMFormat, + SAMRecordWritable +} object ADAMSAMOutputFormat extends Serializable { @@ -76,9 +83,23 @@ class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org class ADAMSAMOutputFormatHeaderLess[K] extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable { - setSAMHeader(ADAMSAMOutputFormat.getHeader) setWriteHeader(false) + override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = { + val conf = ContextUtil.getConfiguration(context) + + // where is our header file? + val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path")) + + // read the header file + readSAMHeaderFrom(path, conf) + + // now that we have the header set, we need to make a record reader + return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""), + header, + false, + context) + } } class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala index ffbc4fc721..534f1bf1b6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala @@ -17,8 +17,15 @@ */ package org.bdgenomics.adam.rdd.read -import java.io.{ OutputStream, StringWriter } -import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency } +import java.io.{ + InputStream, + OutputStream, + StringWriter, + Writer +} +import htsjdk.samtools._ +import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream } +import java.lang.reflect.InvocationTargetException import org.apache.avro.Schema import org.apache.avro.file.DataFileWriter import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase } @@ -28,7 +35,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MetricsContext._ -import org.apache.spark.rdd.{ PartitionPruningRDD, RDD } +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.algorithms.consensus.{ ConsensusGenerator, ConsensusGeneratorFromReads } import org.bdgenomics.adam.converters.AlignmentRecordConverter @@ -42,9 +49,9 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.adam.util.MapTools import org.bdgenomics.formats.avro._ import org.seqdoop.hadoop_bam.SAMRecordWritable -import scala.reflect.ClassTag - +import scala.annotation.tailrec import scala.language.implicitConversions +import scala.reflect.ClassTag class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) { @@ -297,7 +304,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) rgd: RecordGroupDictionary, asSam: Boolean = true, asSingleFile: Boolean = false, - isSorted: Boolean = false) = SAMSave.time { + isSorted: Boolean = false): Unit = SAMSave.time { // if the file is sorted, make sure the sequence dictionary is sorted val sdFinal = if (isSorted) { @@ -314,51 +321,51 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) // add keys to our records val withKey = convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart)) - val bcastHeader = rdd.context.broadcast(header) - val mp = rdd.mapPartitionsWithIndex((idx, iter) => { - log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx") - val header = bcastHeader.value - synchronized { - // perform map partition call to ensure that the SAM/BAM header is set on all - // nodes in the cluster; see: - // https://github.com/bigdatagenomics/adam/issues/353, - // https://github.com/bigdatagenomics/adam/issues/676 - - asSam match { - case true => - ADAMSAMOutputFormat.clearHeader() - ADAMSAMOutputFormat.addHeader(header) - log.info(s"Set SAM header for partition $idx") - case false => - ADAMBAMOutputFormat.clearHeader() - ADAMBAMOutputFormat.addHeader(header) - log.info(s"Set BAM header for partition $idx") - } - } - Iterator[Int]() - }).count() - - // force value check, ensure that computation happens - if (mp != 0) { - log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.") - } - - // attach header to output format - asSam match { - case true => - ADAMSAMOutputFormat.clearHeader() - ADAMSAMOutputFormat.addHeader(header) - log.info("Set SAM header on driver") - case false => - ADAMBAMOutputFormat.clearHeader() - ADAMBAMOutputFormat.addHeader(header) - log.info("Set BAM header on driver") - } - // write file to disk val conf = rdd.context.hadoopConfiguration if (!asSingleFile) { + val bcastHeader = rdd.context.broadcast(header) + val mp = rdd.mapPartitionsWithIndex((idx, iter) => { + log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx") + val header = bcastHeader.value + synchronized { + // perform map partition call to ensure that the SAM/BAM header is set on all + // nodes in the cluster; see: + // https://github.com/bigdatagenomics/adam/issues/353, + // https://github.com/bigdatagenomics/adam/issues/676 + + asSam match { + case true => + ADAMSAMOutputFormat.clearHeader() + ADAMSAMOutputFormat.addHeader(header) + log.info(s"Set SAM header for partition $idx") + case false => + ADAMBAMOutputFormat.clearHeader() + ADAMBAMOutputFormat.addHeader(header) + log.info(s"Set BAM header for partition $idx") + } + } + Iterator[Int]() + }).count() + + // force value check, ensure that computation happens + if (mp != 0) { + log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.") + } + + // attach header to output format + asSam match { + case true => + ADAMSAMOutputFormat.clearHeader() + ADAMSAMOutputFormat.addHeader(header) + log.info("Set SAM header on driver") + case false => + ADAMBAMOutputFormat.clearHeader() + ADAMBAMOutputFormat.addHeader(header) + log.info("Set BAM header on driver") + } + asSam match { case true => withKey.saveAsNewAPIHadoopFile( @@ -380,53 +387,210 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) } } else { log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)") - val (outputFormat, headerLessOutputFormat) = asSam match { - case true => - ( - classOf[InstrumentedADAMSAMOutputFormat[LongWritable]], - classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]] - ) - case false => - ( - classOf[InstrumentedADAMBAMOutputFormat[LongWritable]], - classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]] - ) + val fs = FileSystem.get(conf) + + val headPath = new Path(filePath + "_head") + val tailPath = new Path(filePath + "_tail") + val outputPath = new Path(filePath) + + // get an output stream + val os = fs.create(headPath) + .asInstanceOf[OutputStream] + + // TIL: sam and bam are written in completely different ways! + if (asSam) { + val sw: Writer = new StringWriter() + val stw = new SAMTextWriter(os) + stw.setHeader(header) + stw.close() + } else { + // create htsjdk specific streams for writing the bam header + val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null) + val binaryCodec = new BinaryCodec(compressedOut); + + // write a bam header - cribbed from Hadoop-BAM + binaryCodec.writeBytes("BAM\001".getBytes()) + val sw: Writer = new StringWriter() + new SAMTextHeaderCodec().encode(sw, header) + binaryCodec.writeString(sw.toString, true, false) + + // write sequence dictionary + val ssd = header.getSequenceDictionary() + binaryCodec.writeInt(ssd.size()) + ssd.getSequences + .toList + .foreach(r => { + binaryCodec.writeString(r.getSequenceName(), true, true) + binaryCodec.writeInt(r.getSequenceLength()) + }) + + // flush and close all the streams + compressedOut.flush() + compressedOut.close() } - val headPath = filePath + "_head" - val tailPath = filePath + "_tail" + // more flushing and closing + os.flush() + os.close() - PartitionPruningRDD.create(withKey, i => { i == 0 }).saveAsNewAPIHadoopFile( - headPath, - classOf[LongWritable], - classOf[SAMRecordWritable], - outputFormat, - conf - ) - PartitionPruningRDD.create(withKey, i => { i != 0 }).saveAsNewAPIHadoopFile( - tailPath, + // set path to header file + conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString) + + // set up output format + val headerLessOutputFormat = if (asSam) { + classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]] + } else { + classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]] + } + + // save rdd + withKey.saveAsNewAPIHadoopFile( + tailPath.toString, classOf[LongWritable], classOf[SAMRecordWritable], headerLessOutputFormat, conf ) - val fs = FileSystem.get(conf) + // get a list of all of the files in the tail file + val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath))) + .toSeq + .map(_.getPath) + .sortBy(_.getName) + .toArray + + // try to merge this via the fs api, which should guarantee ordering...? + // however! this function is not implemented on all platforms, hence the try. + try { + + // concat files together + // we need to do this through reflection because the concat method is + // NOT in hadoop 1.x + + // find the concat method + val fsMethods = classOf[FileSystem].getDeclaredMethods + val filteredMethods = fsMethods.filter(_.getName == "concat") + + // did we find the method? if not, throw an exception + if (filteredMethods.size == 0) { + throw new IllegalStateException( + "Could not find concat method in FileSystem. Methods included:\n" + + fsMethods.map(_.getName).mkString("\n") + + "\nAre you running Hadoop 1.x?") + } else if (filteredMethods.size > 1) { + throw new IllegalStateException( + "Found multiple concat methods in FileSystem:\n%s".format( + filteredMethods.map(_.getName).mkString("\n"))) + } - fs.listStatus(headPath) - .find(_.getPath.getName.startsWith("part-r")) - .map(fileStatus => FileUtil.copy(fs, fileStatus.getPath, fs, tailPath + "/head", false, false, conf)) - fs.delete(headPath, true) + // since we've done our checking, let's get the method now + val concatMethod = filteredMethods.head + + // we need to move the head file into the tailFiles directory + // this is a requirement of the concat method + val newHeadPath = new Path("%s/header".format(tailPath)) + fs.rename(headPath, newHeadPath) + + // invoke the method on the fs instance + // if we are on hadoop 2.x, this makes the call: + // + // fs.concat(headPath, tailFiles) + try { + concatMethod.invoke(fs, newHeadPath, tailFiles) + } catch { + case ite: InvocationTargetException => { + // move the head file back - essentially, unroll the prep for concat + fs.rename(newHeadPath, headPath) + + // the only reason we have this try/catch is to unwrap the wrapped + // exception and rethrow. this gives clearer logging messages + throw ite.getTargetException + } + } - FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null) - fs.delete(tailPath, true) + // move concated file + fs.rename(newHeadPath, outputPath) - } + // delete tail files + fs.delete(tailPath, true) + } catch { + case e: Throwable => { - } + log.warn("Caught exception when merging via Hadoop FileSystem API:\n%s".format(e)) + log.warn("Retrying as manual copy from the driver which will degrade performance.") + + // doing this correctly is surprisingly hard + // specifically, copy merge does not care about ordering, which is + // fine if your files are unordered, but if the blocks in the file + // _are_ ordered, then hahahahahahahahahaha. GOOD. TIMES. + // + // fortunately, the blocks in our file are ordered + // the performance of this section is hilarious + // + // specifically, the performance is hilariously bad + // + // but! it is correct. + + // open our output file + val os = fs.create(outputPath) + + // prepend our header to the list of files to copy + val filesToCopy = Seq(headPath) ++ tailFiles.toSeq + + // here is a byte array for copying + val ba = new Array[Byte](1024) - implicit def str2Path(str: String): Path = new Path(str) + @tailrec def copy(is: InputStream, + os: OutputStream) { + + // make a read + val bytesRead = is.read(ba) + + // did our read succeed? if so, write to output stream + // and continue + if (bytesRead >= 0) { + os.write(ba, 0, bytesRead) + + copy(is, os) + } + } + + // loop over allllll the files and copy them + val numFiles = filesToCopy.length + var filesCopied = 1 + filesToCopy.foreach(p => { + + // print a bit of progress logging + log.info("Copying file %s, file %d of %d.".format( + p.toString, + filesCopied, + numFiles)) + + // open our input file + val is = fs.open(p) + + // until we are out of bytes, copy + copy(is, os) + + // close our input stream + is.close() + + // increment file copy count + filesCopied += 1 + }) + + // flush and close the output stream + os.flush() + os.close() + + // delete temp files + fs.delete(headPath, true) + fs.delete(tailPath, true) + } + } + } + } def getSequenceRecordsFromElement(elem: AlignmentRecord): Set[SequenceRecord] = { SequenceRecord.fromADAMRecord(elem).toSet diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala index 4904ac6019..32c3b7a2f7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala @@ -298,7 +298,7 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite { checkFiles(resourcePath("ordered.sam"), actualSortedPath) } - sparkTest("write single sam file back") { + def testBQSR(asSam: Boolean, filename: String) { val inputPath = resourcePath("bqsr1.sam") val tempFile = Files.createTempDirectory("bqsr1") val rRdd = sc.loadAlignments(inputPath) @@ -306,12 +306,12 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite { val sd = rRdd.sequences val rgd = rRdd.recordGroups rdd.cache() - rdd.adamSAMSave(tempFile.toAbsolutePath.toString + "/bqsr1.sam", + rdd.adamSAMSave("%s/%s".format(tempFile.toAbsolutePath.toString, filename), sd, rgd, asSam = true, asSingleFile = true) - val rdd2 = sc.loadAlignments(tempFile.toAbsolutePath.toString + "/bqsr1.sam").rdd + val rdd2 = sc.loadAlignments("%s/%s".format(tempFile.toAbsolutePath.toString, filename)).rdd rdd2.cache() val (fsp1, fsf1) = rdd.adamFlagStat() @@ -378,4 +378,12 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite { } }) } + + sparkTest("write single sam file back") { + testBQSR(true, "bqsr1.sam") + } + + sparkTest("write single bam file back") { + testBQSR(false, "bqsr1.bam") + } }