Skip to content

Commit

Permalink
[ADAM-962] Fix corrupt single-file BAM output.
Browse files Browse the repository at this point in the history
It seems like we were doing something incorrectly when writing the header.
Additionally, we now write a correct end-of-file. Resolves #962.
  • Loading branch information
fnothaft committed Feb 26, 2016
1 parent b29d204 commit 551c559
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
*/
package org.bdgenomics.adam.rdd.read

import htsjdk.samtools._
import htsjdk.samtools.util.{
BinaryCodec,
BlockCompressedOutputStream,
BlockCompressedStreamConstants
}
import java.io.{
InputStream,
OutputStream,
StringWriter,
Writer
}
import htsjdk.samtools._
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import java.lang.reflect.InvocationTargetException
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
Expand All @@ -48,7 +52,8 @@ import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, ADAMSequenceDictionaryRDDAggre
import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.adam.util.MapTools
import org.bdgenomics.formats.avro._
import org.seqdoop.hadoop_bam.SAMRecordWritable
import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable }
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer
import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.reflect.ClassTag
Expand Down Expand Up @@ -430,7 +435,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
compressedOut.close()
}

// more flushing and closing
os.flush()
os.close()

Expand Down Expand Up @@ -535,31 +539,31 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
// open our output file
val os = fs.create(outputPath)

// prepend our header to the list of files to copy
val filesToCopy = Seq(headPath) ++ tailFiles.toSeq
// prepare output
new SAMOutputPreparer().prepareForRecords(os, SAMFormat.BAM, header);

// here is a byte array for copying
val ba = new Array[Byte](1024)

@tailrec def copy(is: InputStream,
os: OutputStream) {
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
os.write(ba, 0, bytesRead)
los.write(ba, 0, bytesRead)

copy(is, os)
copy(is, los)
}
}

// loop over allllll the files and copy them
val numFiles = filesToCopy.length
var filesCopied = 1
filesToCopy.foreach(p => {
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
Expand All @@ -580,6 +584,9 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
filesCopied += 1
})

// finish the file off
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);

// flush and close the output stream
os.flush()
os.close()
Expand Down

0 comments on commit 551c559

Please sign in to comment.