Skip to content

Commit

Permalink
[ADAM-1009] Port single file save code over to VCF.
Browse files Browse the repository at this point in the history
Factor single file save code out of AlignmentRecordRDD. Resolves bigdatagenomics#1009.
  • Loading branch information
fnothaft committed Aug 8, 2016
1 parent e7e1adf commit 4260568
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class ADAM2VcfArgs extends Args4jBase with ParquetArgs {

@Args4jOption(required = false, name = "-sort_on_save", usage = "Sort the VCF output.")
var sort: Boolean = false

@Args4jOption(required = false, name = "-single", usage = "Save as a single VCF file.")
var single: Boolean = false
}

class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] with DictionaryCommand with Logging {
Expand All @@ -80,6 +83,8 @@ class ADAM2Vcf(val args: ADAM2VcfArgs) extends BDGSparkCommand[ADAM2VcfArgs] wit
variantContexts
}

variantContextsToSave.saveAsVcf(args.outputPath, sortOnSave = args.sort)
variantContextsToSave.saveAsVcf(args.outputPath,
sortOnSave = args.sort,
asSingleFile = args.single)
}
}
126 changes: 126 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/FileMerger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import htsjdk.samtools.util.BlockCompressedStreamConstants
import java.io.{ InputStream, OutputStream }
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.bdgenomics.utils.misc.Logging
import scala.annotation.tailrec

private[rdd] object FileMerger extends Logging {

def mergeFiles(fs: FileSystem,
outputPath: Path,
tailPath: Path,
optHeaderPath: Option[Path] = None,
writeEmptyGzipBlock: Boolean = false,
bufferSize: Int = 1024) {

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
.toArray

// doing this correctly is surprisingly hard
// specifically, copy merge does not care about ordering, which is
// fine if your files are unordered, but if the blocks in the file
// _are_ ordered, then hahahahahahahahahaha. GOOD. TIMES.
//
// fortunately, the blocks in our file are ordered
// the performance of this section is hilarious
//
// specifically, the performance is hilariously bad
//
// but! it is correct.

// open our output file
val os = fs.create(outputPath)

// here is a byte array for copying
val ba = new Array[Byte](bufferSize)

@tailrec def copy(is: InputStream,
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
los.write(ba, 0, bytesRead)

copy(is, los)
}
}

// optionally copy the header
optHeaderPath.foreach(p => {
log.info("Copying header file (%s)".format(p))

// open our input file
val is = fs.open(p)

// until we are out of bytes, copy
copy(is, os)

// close our input stream
is.close()
})

// loop over allllll the files and copy them
val numFiles = tailFiles.length
var filesCopied = 1
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
p.toString,
filesCopied,
numFiles))

// open our input file
val is = fs.open(p)

// until we are out of bytes, copy
copy(is, os)

// close our input stream
is.close()

// increment file copy count
filesCopied += 1
})

// finish the file off
if (writeEmptyGzipBlock) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
}

// flush and close the output stream
os.flush()
os.close()

// delete temp files
optHeaderPath.foreach(headPath => fs.delete(headPath, true))
fs.delete(tailPath, true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{
AvroReadGroupGenomicRDD,
ADAMSaveAnyArgs,
FileMerger,
JavaSaveArgs,
Unaligned
}
Expand All @@ -55,8 +56,6 @@ import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.misc.Logging
import org.seqdoop.hadoop_bam.{ SAMFormat, SAMRecordWritable }
import org.seqdoop.hadoop_bam.util.SAMOutputPreparer
import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.math.{ abs, min }

Expand Down Expand Up @@ -412,122 +411,11 @@ sealed trait AlignmentRecordRDD extends AvroReadGroupGenomicRDD[AlignmentRecord,
conf
)

// get a list of all of the files in the tail file
val tailFiles = fs.globStatus(new Path("%s/part-*".format(tailPath)))
.toSeq
.map(_.getPath)
.sortBy(_.getName)
.toArray

// try to merge this via the fs api, which should guarantee ordering...?
// however! this function is not implemented on all platforms, hence the try.
try {

// we need to move the head file into the tailFiles directory
// this is a requirement of the concat method
val newHeadPath = new Path("%s/header".format(tailPath))
fs.rename(headPath, newHeadPath)

try {
fs.concat(newHeadPath, tailFiles)
} catch {
case t: Throwable => {
// move the head file back - essentially, unroll the prep for concat
fs.rename(newHeadPath, headPath)
throw t
}
}

// move concatenated file
fs.rename(newHeadPath, outputPath)

// delete tail files
fs.delete(tailPath, true)
} catch {
case e: Throwable => {

log.warn("Caught exception when merging via Hadoop FileSystem API:\n%s".format(e))
log.warn("Retrying as manual copy from the driver which will degrade performance.")

// doing this correctly is surprisingly hard
// specifically, copy merge does not care about ordering, which is
// fine if your files are unordered, but if the blocks in the file
// _are_ ordered, then hahahahahahahahahaha. GOOD. TIMES.
//
// fortunately, the blocks in our file are ordered
// the performance of this section is hilarious
//
// specifically, the performance is hilariously bad
//
// but! it is correct.

// open our output file
val os = fs.create(outputPath)

// prepare output
val format = if (asSam) {
SAMFormat.SAM
} else {
SAMFormat.BAM
}
new SAMOutputPreparer().prepareForRecords(os, format, header);

// here is a byte array for copying
val ba = new Array[Byte](1024)

@tailrec def copy(is: InputStream,
los: OutputStream) {

// make a read
val bytesRead = is.read(ba)

// did our read succeed? if so, write to output stream
// and continue
if (bytesRead >= 0) {
los.write(ba, 0, bytesRead)

copy(is, los)
}
}

// loop over allllll the files and copy them
val numFiles = tailFiles.length
var filesCopied = 1
tailFiles.toSeq.foreach(p => {

// print a bit of progress logging
log.info("Copying file %s, file %d of %d.".format(
p.toString,
filesCopied,
numFiles))

// open our input file
val is = fs.open(p)

// until we are out of bytes, copy
copy(is, os)

// close our input stream
is.close()

// increment file copy count
filesCopied += 1
})

// finish the file off
if (!asSam) {
os.write(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
}

// flush and close the output stream
os.flush()
os.close()

// delete temp files
fs.delete(headPath, true)
fs.delete(tailPath, true)
}
}
FileMerger.mergeFiles(fs,
outputPath,
tailPath,
optHeaderPath = Some(headPath),
writeEmptyGzipBlock = !asSam)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.rdd.variation

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.VariantContextConverter
Expand All @@ -26,7 +27,7 @@ import org.bdgenomics.adam.models.{
SequenceDictionary,
VariantContext
}
import org.bdgenomics.adam.rdd.MultisampleGenomicRDD
import org.bdgenomics.adam.rdd.{ FileMerger, MultisampleGenomicRDD }
import org.bdgenomics.formats.avro.Sample
import org.bdgenomics.utils.cli.SaveArgs
import org.seqdoop.hadoop_bam._
Expand Down Expand Up @@ -102,7 +103,8 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
* @param sortOnSave Whether to sort before saving. Default is false (no sort).
*/
def saveAsVcf(filePath: String,
sortOnSave: Boolean = false) {
sortOnSave: Boolean = false,
asSingleFile: Boolean = false) {
val vcfFormat = VCFFormat.inferFromFilePath(filePath)
assert(vcfFormat == VCFFormat.VCF, "BCF not yet supported") // TODO: Add BCF support

Expand Down Expand Up @@ -155,11 +157,28 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
// save to disk
val conf = rdd.context.hadoopConfiguration
conf.set(VCFOutputFormat.OUTPUT_VCF_FORMAT_PROPERTY, vcfFormat.toString)
writableVCs.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable], classOf[VariantContextWritable], classOf[ADAMVCFOutputFormat[LongWritable]],
conf
)

if (asSingleFile) {

// write shards to disk
val tailPath = "%s_tail".format(filePath)
writableVCs.saveAsNewAPIHadoopFile(
tailPath,
classOf[LongWritable], classOf[VariantContextWritable], classOf[ADAMVCFOutputFormat[LongWritable]],
conf
)

// merge shards
FileMerger.mergeFiles(FileSystem.get(conf),
new Path(filePath),
new Path(tailPath))
} else {
writableVCs.saveAsNewAPIHadoopFile(
filePath,
classOf[LongWritable], classOf[VariantContextWritable], classOf[ADAMVCFOutputFormat[LongWritable]],
conf
)
}

log.info("Write %d records".format(writableVCs.count()))
rdd.unpersist()
Expand Down

0 comments on commit 4260568

Please sign in to comment.