Skip to content

Commit

Permalink
[ADAM-676] Clean up header issues for sharded files.
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#676. In bigdatagenomics#964, we resolved the "header not set" issues for single file
SAM/BAM output. This change propegates this fix to sharded SAM/BAM output, and
VCF.
  • Loading branch information
fnothaft committed Aug 30, 2016
1 parent fa2283f commit 8ece47f
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Reads2CoverageArgs extends Args4jBase with ParquetArgs {
var onlyNegativeStrands: Boolean = false
@Args4jOption(required = false, name = "-only_positive_strands", usage = "Compute coverage for positive strands")
var onlyPositiveStrands: Boolean = false
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file")
var asSingleFile: Boolean = false
}

class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCommand[Reads2CoverageArgs] {
Expand All @@ -80,7 +82,7 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
}

finalReads.toCoverage(args.collapse)
.save(args.outputPath)
.save(args.outputPath, asSingleFile = args.asSingleFile)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ case class CoverageRDD(rdd: RDD[Coverage],
*
* @param filePath The location to write the output.
*/
def save(filePath: java.lang.String) = {
this.toFeatureRDD.save(filePath)
def save(filePath: java.lang.String, asSingleFile: java.lang.Boolean) = {
this.toFeatureRDD.save(filePath, asSingleFile = asSingleFile)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,26 @@ import org.seqdoop.hadoop_bam.{
SAMRecordWritable
}

object ADAMBAMOutputFormat extends Serializable {
class ADAMBAMOutputFormat[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {

private[read] var header: Option[SAMFileHeader] = None
setWriteHeader(true)

/**
* Attaches a header to the ADAMSAMOutputFormat Hadoop writer. If a header has previously
* been attached, the header must be cleared first.
*
* @throws Exception Exception thrown if a SAM header has previously been attached, and not cleared.
*
* @param samHeader Header to attach.
*
* @see clearHeader
*/
def addHeader(samHeader: SAMFileHeader) {
assert(header.isEmpty, "Cannot attach a new SAM header without first clearing the header.")
header = Some(samHeader)
}

/**
* Clears the attached header.
*
* @see addHeader
*/
def clearHeader() {
header = None
}
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

/**
* Returns the current header.
*
* @return Current SAM header.
*/
private[read] def getHeader: SAMFileHeader = {
assert(header.isDefined, "Cannot return header if not attached.")
header.get
}
}
// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

class ADAMBAMOutputFormat[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {
// read the header file
readSAMHeaderFrom(path, conf)

setSAMHeader(ADAMBAMOutputFormat.getHeader)
// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
true,
context)
}
}

class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,49 +29,26 @@ import org.seqdoop.hadoop_bam.{
SAMRecordWritable
}

object ADAMSAMOutputFormat extends Serializable {
class ADAMSAMOutputFormat[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {

private[read] var header: Option[SAMFileHeader] = None
setWriteHeader(true)

/**
* Attaches a header to the ADAMSAMOutputFormat Hadoop writer. If a header has previously
* been attached, the header must be cleared first.
*
* @throws Exception Exception thrown if a SAM header has previously been attached, and not cleared.
*
* @param samHeader Header to attach.
*
* @see clearHeader
*/
def addHeader(samHeader: SAMFileHeader) {
assert(header.isEmpty, "Cannot attach a new SAM header without first clearing the header.")
header = Some(samHeader)
}

/**
* Clears the attached header.
*
* @see addHeader
*/
def clearHeader() {
header = None
}
override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = context.getConfiguration()

/**
* Returns the current header.
*
* @return Current SAM header.
*/
private[read] def getHeader: SAMFileHeader = {
assert(header.isDefined, "Cannot return header if not attached.")
header.get
}
}
// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

class ADAMSAMOutputFormat[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {
// read the header file
readSAMHeaderFrom(path, conf)

setSAMHeader(ADAMSAMOutputFormat.getHeader)
// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
header,
true,
context)
}
}

class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,112 +315,77 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
// write file to disk
val conf = rdd.context.hadoopConfiguration

if (!asSingleFile) {
val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx")
val header = bcastHeader.value
synchronized {
// perform map partition call to ensure that the SAM/BAM header is set on all
// nodes in the cluster; see:
// https://github.com/bigdatagenomics/adam/issues/353,
// https://github.com/bigdatagenomics/adam/issues/676

asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info(s"Set SAM header for partition $idx")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info(s"Set BAM header for partition $idx")
}
}
Iterator[Int]()
}).count()
// get file system
val headPath = new Path(filePath + "_head")
val tailPath = new Path(filePath + "_tail")
val outputPath = new Path(filePath)
val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration)

// TIL: sam and bam are written in completely different ways!
if (asSam) {
SAMHeaderWriter.writeHeader(fs, headPath, header)
} else {

// force value check, ensure that computation happens
if (mp != 0) {
log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.")
}
// get an output stream
val os = fs.create(headPath)
.asInstanceOf[OutputStream]

// create htsjdk specific streams for writing the bam header
val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null)
val binaryCodec = new BinaryCodec(compressedOut);

// write a bam header - cribbed from Hadoop-BAM
binaryCodec.writeBytes("BAM\001".getBytes())
val sw: Writer = new StringWriter()
new SAMTextHeaderCodec().encode(sw, header)
binaryCodec.writeString(sw.toString, true, false)

// write sequence dictionary
val ssd = header.getSequenceDictionary
binaryCodec.writeInt(ssd.size())
ssd.getSequences
.toList
.foreach(r => {
binaryCodec.writeString(r.getSequenceName(), true, true)
binaryCodec.writeInt(r.getSequenceLength())
})

// flush and close all the streams
compressedOut.flush()
compressedOut.close()
os.flush()
os.close()
}

// attach header to output format
asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info("Set SAM header on driver")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info("Set BAM header on driver")
}
// set path to header file
conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString)

asSam match {
case true =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
conf
)
case false =>
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
conf
)
if (!asSingleFile) {
if (asSam) {
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
conf
)
} else {
withKey.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable],
classOf[SAMRecordWritable],
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
conf
)
}

// clean up the header after writing
fs.delete(headPath, true)
} else {
log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)")

val headPath = new Path(filePath + "_head")
val tailPath = new Path(filePath + "_tail")
val outputPath = new Path(filePath)
val fs = headPath.getFileSystem(rdd.context.hadoopConfiguration)

// TIL: sam and bam are written in completely different ways!
if (asSam) {
SAMHeaderWriter.writeHeader(fs, headPath, header)
} else {

// get an output stream
val os = fs.create(headPath)
.asInstanceOf[OutputStream]

// create htsjdk specific streams for writing the bam header
val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null)
val binaryCodec = new BinaryCodec(compressedOut);

// write a bam header - cribbed from Hadoop-BAM
binaryCodec.writeBytes("BAM\001".getBytes())
val sw: Writer = new StringWriter()
new SAMTextHeaderCodec().encode(sw, header)
binaryCodec.writeString(sw.toString, true, false)

// write sequence dictionary
val ssd = header.getSequenceDictionary
binaryCodec.writeInt(ssd.size())
ssd.getSequences
.toList
.foreach(r => {
binaryCodec.writeString(r.getSequenceName(), true, true)
binaryCodec.writeInt(r.getSequenceLength())
})

// flush and close all the streams
compressedOut.flush()
compressedOut.close()
os.flush()
os.close()
}

// set path to header file
conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString)

// set up output format
val headerLessOutputFormat = if (asSam) {
Expand Down
Loading

0 comments on commit 8ece47f

Please sign in to comment.