Skip to content

Commit

Permalink
[ADAM-1554] Support saving BGZF VCF output.
Browse files Browse the repository at this point in the history
Resolves #1554. Additionally:

* Moves VCF file extension logic into ADAM, since Hadoop-BAM does not recognize
  .bgzf as a valid BGZF extension
* Refactored VCF output formats to remove need for ADAMHeaderlessVCFOutputFormat
  • Loading branch information
fnothaft authored and heuermh committed Jul 17, 2017
1 parent a1b585a commit 406d1e3
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ private class FileFilter(private val name: String) extends PathFilter {
}
}

/**
* A filter to run on globs/directories that finds all files that do not start
* with a given string.
*
* @param prefix The prefix to search for. Files that contain this prefix are
* discarded.
*/
private class NoPrefixFileFilter(private val prefix: String) extends PathFilter {

/**
* @param path Path to evaluate.
* @return Returns true if the pathName of the path does not match the prefix passed
* to the constructor.
*/
def accept(path: Path): Boolean = {
!path.getName.startsWith(prefix)
}
}

/**
* The ADAMContext provides functions on top of a SparkContext for loading genomic data.
*
Expand Down Expand Up @@ -207,7 +226,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
*/
private[rdd] def loadVcfMetadata(pathName: String): (SequenceDictionary, Seq[Sample], Seq[VCFHeaderLine]) = {
// get the paths to all vcfs
val files = getFsAndFiles(new Path(pathName))
val files = getFsAndFilesWithFilter(pathName, new NoPrefixFileFilter("_"))

// load yonder the metadata
files.map(p => loadSingleVcfMetadata(p.toString)).reduce((p1, p2) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.bdgenomics.adam.rdd

import htsjdk.samtools.util.BlockCompressedOutputStream
import htsjdk.variant.variantcontext.writer.{
Options,
VariantContextWriterBuilder
Expand All @@ -36,14 +37,19 @@ private[rdd] object VCFHeaderUtils {
* @param header The header to write.
* @param path The path to write it to.
* @param conf The configuration to get the file system.
* @param isCompressed If true, writes as BGZIP.
* @param noEof If writing a compressed file, omits the terminating BGZF EOF
* block. Ignored if output is uncompressed.
*/
def write(header: VCFHeader,
path: Path,
conf: Configuration) {
conf: Configuration,
isCompressed: Boolean,
noEof: Boolean) {

val fs = path.getFileSystem(conf)

write(header, path, fs)
write(header, path, fs, isCompressed, noEof)
}

/**
Expand All @@ -52,13 +58,25 @@ private[rdd] object VCFHeaderUtils {
* @param header The header to write.
* @param path The path to write it to.
* @param fs The file system to write to.
* @param isCompressed If true, writes as BGZIP.
* @param noEof If writing a compressed file, omits the terminating BGZF EOF
* block. Ignored if output is uncompressed.
*/
def write(header: VCFHeader,
path: Path,
fs: FileSystem) {
fs: FileSystem,
isCompressed: Boolean,
noEof: Boolean) {

// get an output stream
val os = fs.create(path)
val fos = fs.create(path)
val os = if (isCompressed) {
// BGZF stream requires java.io.File
// we can't get one in hadoop land, so it is OK to provide a null file
new BlockCompressedOutputStream(fos, null)
} else {
fos
}

// build a vcw
val vcw = new VariantContextWriterBuilder()
Expand All @@ -71,7 +89,22 @@ private[rdd] object VCFHeaderUtils {
vcw.writeHeader(header)

// close the writer
// vcw.close calls close on the underlying stream, see ADAM-1337
vcw.close()
// originally, this was vcw.close, which calls close on the underlying
// stream (see ADAM-1337 for a longer history). however, to support BGZF, we
// need to separate out the calls. specifically, calling close on the vcw
// calls close on the BlockCompressOutputStream, which leads to the stream
// writing the BGZF EOF indicator (an empty BGZF block).
//
// if we are writing an uncompressed VCF or a sharded BGZF'ed VCF, this is
// OK. if we write the BGZF EOF indicator at the end of the BGZF'ed VCF's
// header and we are writing a merged BGZF'ed VCF, this leads to the reader
// seeing an EOF at the end of the header and reading no records from the
// vcf. if we call flush and close seperately, this behavior doesn't occur.
if (isCompressed && noEof) {
os.flush()
fos.close()
} else {
vcw.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,6 @@ class ADAMVCFOutputFormat[K] extends KeyIgnoringVCFOutputFormat[K](VCFFormat.VCF
readHeaderFrom(path, FileSystem.get(conf))

// return record writer
return new KeyIgnoringVCFRecordWriter[K](getDefaultWorkFile(context, ""),
header,
true,
context);
}
}

/**
* Wrapper for Hadoop-BAM to work around requirement for no-args constructor.
*
* @tparam K The key type. Keys are not written.
*/
class ADAMHeaderlessVCFOutputFormat[K] extends KeyIgnoringVCFOutputFormat[K](VCFFormat.VCF) with Serializable {

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, VariantContextWritable] = {
val conf = context.getConfiguration()

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.variant.vcf_header_path"))

// read the header file
readHeaderFrom(path, FileSystem.get(conf))

// return record writer
return new KeyIgnoringVCFRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context);
super.getRecordWriter(context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ sealed abstract class GenotypeRDD extends MultisampleAvroGenomicRDD[Genotype, Ge
// write vcf headers to file
VCFHeaderUtils.write(new VCFHeader(headerLines.toSet),
new Path("%s/_header".format(filePath)),
rdd.context.hadoopConfiguration)
rdd.context.hadoopConfiguration,
false,
false)
}

override protected def saveMetadata(filePath: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.bdgenomics.adam.rdd.variant

import htsjdk.samtools.ValidationStringency
import htsjdk.samtools.util.BlockCompressedOutputStream
import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine }
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.{
DefaultHeaderLines,
Expand All @@ -39,13 +42,18 @@ import org.bdgenomics.adam.rdd.{
MultisampleGenomicRDD,
VCFHeaderUtils
}
import org.bdgenomics.adam.util.FileExtensions
import org.bdgenomics.formats.avro.Sample
import org.bdgenomics.utils.misc.Logging
import org.bdgenomics.utils.interval.array.{
IntervalArray,
IntervalArraySerializer
}
import org.seqdoop.hadoop_bam._
import org.seqdoop.hadoop_bam.util.{
BGZFCodec,
BGZFEnhancedGzipCodec
}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -211,6 +219,8 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
* Converts an RDD of ADAM VariantContexts to HTSJDK VariantContexts
* and saves to disk as VCF.
*
* Files that end in .gz or .bgz will be saved as block GZIP compressed VCFs.
*
* @param filePath The filepath to save to.
* @param asSingleFile If true, saves the output as a single file by merging
* the sharded output after completing the write to HDFS. If false, the
Expand All @@ -227,8 +237,15 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
deferMerging: Boolean,
disableFastConcat: Boolean,
stringency: ValidationStringency) {
val vcfFormat = VCFFormat.inferFromFilePath(filePath)
assert(vcfFormat == VCFFormat.VCF, "BCF not yet supported") // TODO: Add BCF support

// TODO: Add BCF support
val vcfFormat = VCFFormat.VCF
val isBgzip = FileExtensions.isGzip(filePath)
if (!FileExtensions.isVcfExt(filePath)) {
throw new IllegalArgumentException(
"Saw non-VCF extension for file %s. Extensions supported are .vcf<.gz, .bgz>"
.format(filePath))
}

log.info(s"Writing $vcfFormat file to $filePath")

Expand Down Expand Up @@ -262,32 +279,48 @@ case class VariantContextRDD(rdd: RDD[VariantContext],
// write vcf header
VCFHeaderUtils.write(header,
headPath,
fs)
fs,
isBgzip,
asSingleFile)

// set path to header file and the vcf format
conf.set("org.bdgenomics.adam.rdd.variant.vcf_header_path", headPath.toString)
conf.set(VCFOutputFormat.OUTPUT_VCF_FORMAT_PROPERTY, vcfFormat.toString)
if (isBgzip) {
conf.setStrings("io.compression.codecs",
classOf[BGZFCodec].getCanonicalName,
classOf[BGZFEnhancedGzipCodec].getCanonicalName)
conf.setBoolean(FileOutputFormat.COMPRESS, true)
conf.setClass(FileOutputFormat.COMPRESS_CODEC,
classOf[BGZFCodec], classOf[CompressionCodec])
}

if (asSingleFile) {

// disable writing header
conf.setBoolean(KeyIgnoringVCFOutputFormat.WRITE_HEADER_PROPERTY,
false)

// write shards to disk
val tailPath = "%s_tail".format(filePath)
writableVCs.saveAsNewAPIHadoopFile(
tailPath,
classOf[LongWritable],
classOf[VariantContextWritable],
classOf[ADAMHeaderlessVCFOutputFormat[LongWritable]],
classOf[ADAMVCFOutputFormat[LongWritable]],
conf
)

// optionally merge
if (!deferMerging) {

FileMerger.mergeFiles(rdd.context,
fs,
new Path(filePath),
new Path(tailPath),
Some(headPath),
disableFastConcat = disableFastConcat)
disableFastConcat = disableFastConcat,
writeEmptyGzipBlock = isBgzip)
}
} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ sealed abstract class VariantRDD extends AvroGenomicRDD[Variant, VariantProduct,
// write vcf headers to file
VCFHeaderUtils.write(new VCFHeader(headerLines.toSet),
new Path("%s/_header".format(filePath)),
rdd.context.hadoopConfiguration)
rdd.context.hadoopConfiguration,
false,
false)
}

override protected def saveMetadata(filePath: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,18 @@ private[adam] object FileExtensions {
def isVcfExt(pathName: String): Boolean = {
pathName.endsWith(".vcf") ||
pathName.endsWith(".vcf.gz") ||
pathName.endsWith(".vcf.bgzf") ||
pathName.endsWith(".vcf.bgz")
}

/**
* @param pathName The path name to match.
* @return Returns true if the path name matches a GZIP format file extension.
*/
def isGzip(pathName: String): Boolean = {
pathName.endsWith(".gz") ||
pathName.endsWith(".bgz")
}

/**
* @param pathName The path name to match.
* @return Returns true if the path name matches an HTSJDK sequence dictionary (.dict) extension.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,43 @@ class VariantContextRDDSuite extends ADAMFunSuite {

testMetadata(sc.loadVcf(testFile("small.vcf")))
}

sparkTest("save sharded bgzip vcf") {
val smallVcf = testFile("bqsr1.vcf")
val rdd: VariantContextRDD = sc.loadVcf(smallVcf)
val outputPath = tmpFile("bqsr1.vcf.bgz")
rdd.transform(_.repartition(4)).saveAsVcf(outputPath,
asSingleFile = false,
deferMerging = false,
disableFastConcat = false,
stringency = ValidationStringency.STRICT)

assert(sc.loadVcf(outputPath).rdd.count === 681)
}

sparkTest("save bgzip vcf as single file") {
val smallVcf = testFile("small.vcf")
val rdd: VariantContextRDD = sc.loadVcf(smallVcf)
val outputPath = tmpFile("small.vcf.bgz")
rdd.saveAsVcf(outputPath,
asSingleFile = true,
deferMerging = false,
disableFastConcat = false,
stringency = ValidationStringency.STRICT)

assert(sc.loadVcf(outputPath).rdd.count === 6)
}

sparkTest("can't save file with non-vcf extension") {
val smallVcf = testFile("small.vcf")
val rdd: VariantContextRDD = sc.loadVcf(smallVcf)

intercept[IllegalArgumentException] {
rdd.saveAsVcf("small.bcf",
asSingleFile = false,
deferMerging = false,
disableFastConcat = false,
stringency = ValidationStringency.STRICT)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.util

import org.scalatest.FunSuite

class FileExtensionsSuite extends FunSuite {

test("ends in gzip extension") {
assert(FileExtensions.isGzip("file.vcf.gz"))
assert(FileExtensions.isGzip("file.fastq.bgz"))
assert(!FileExtensions.isGzip("file.fastq.bgzf"))
assert(!FileExtensions.isGzip("file.vcf"))
assert(!FileExtensions.isGzip("file.fastq"))
}

test("is a vcf extension") {
assert(FileExtensions.isVcfExt("file.vcf"))
assert(FileExtensions.isVcfExt("file.vcf.bgz"))
assert(!FileExtensions.isVcfExt("file.bcf"))
assert(FileExtensions.isVcfExt("file.vcf.gz"))
assert(!FileExtensions.isVcfExt("file.vcf.bgzf"))
}
}

0 comments on commit 406d1e3

Please sign in to comment.