From 467db1ff993dbbbb1746a6c897c5165e32b618f3 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Sun, 25 Jun 2017 19:26:29 -0700 Subject: [PATCH] [ADAM-1438] Add ability to save FASTA back as a single file. Resolves #1438. To support saving FASTA as a single file, moved the private `writeTextRdd` method from the `FeatureRDD` to `GenomicRDD`, and opened protections to `protected`. --- .../adam/api/java/JavaADAMContigConduit.java | 2 +- .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 39 ++++++++++++++++++- .../contig/NucleotideContigFragmentRDD.scala | 25 ++++++++++-- .../adam/rdd/feature/FeatureRDD.scala | 37 ------------------ .../NucleotideContigFragmentRDDSuite.scala | 8 ++++ 5 files changed, 68 insertions(+), 43 deletions(-) diff --git a/adam-apis/src/test/java/org/bdgenomics/adam/api/java/JavaADAMContigConduit.java b/adam-apis/src/test/java/org/bdgenomics/adam/api/java/JavaADAMContigConduit.java index 64a1021153..1c32b1b461 100644 --- a/adam-apis/src/test/java/org/bdgenomics/adam/api/java/JavaADAMContigConduit.java +++ b/adam-apis/src/test/java/org/bdgenomics/adam/api/java/JavaADAMContigConduit.java @@ -34,7 +34,7 @@ public static NucleotideContigFragmentRDD conduit(final NucleotideContigFragment // make temp directory and save file Path tempDir = Files.createTempDirectory("javaAC"); String fileName = tempDir.toString() + "/testRdd.contig.adam"; - recordRdd.save(fileName); + recordRdd.save(fileName, true); // create a new adam context and load the file JavaADAMContext jac = new JavaADAMContext(ac); diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index cfd35e80e3..81ed431b8f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -21,7 +21,7 @@ import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine } import java.nio.file.Paths import htsjdk.samtools.ValidationStringency import org.apache.avro.generic.IndexedRecord -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkFiles import org.apache.spark.api.java.JavaRDD @@ -1212,6 +1212,43 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { Iterator(Some((firstRegion._1, lastRegion._1))) } } + + /** + * Writes an RDD to disk as text and optionally merges. + * + * @param rdd RDD to save. + * @param outputPath Output path to save text files to. + * @param asSingleFile If true, combines all partition shards. + * @param disableFastConcat If asSingleFile is true, disables the use of the + * parallel file merging engine. + * @param optHeaderPath If provided, the header file to include. + */ + protected def writeTextRdd[T](rdd: RDD[T], + outputPath: String, + asSingleFile: Boolean, + disableFastConcat: Boolean, + optHeaderPath: Option[String] = None) { + if (asSingleFile) { + + // write rdd to disk + val tailPath = "%s_tail".format(outputPath) + rdd.saveAsTextFile(tailPath) + + // get the filesystem impl + val fs = FileSystem.get(rdd.context.hadoopConfiguration) + + // and then merge + FileMerger.mergeFiles(rdd.context, + fs, + new Path(outputPath), + new Path(tailPath), + disableFastConcat = disableFastConcat, + optHeaderPath = optHeaderPath.map(p => new Path(p))) + } else { + assert(optHeaderPath.isEmpty) + rdd.saveAsTextFile(outputPath) + } + } } private case class GenericGenomicRDD[T]( diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala index 6184c6f337..7f33ca1ab6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDD.scala @@ -234,10 +234,14 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot * to Parquet. Defaults to 60 character line length, if saving to FASTA. * * @param fileName file name + * @param asSingleFile If false, writes file to disk as shards with + * one shard per partition. If true, we save the file to disk as a single + * file by merging the shards. */ - def save(fileName: java.lang.String) { + def save(fileName: java.lang.String, + asSingleFile: java.lang.Boolean) { if (fileName.endsWith(".fa") || fileName.endsWith(".fasta")) { - saveAsFasta(fileName) + saveAsFasta(fileName, asSingleFile = asSingleFile) } else { saveAsParquet(new JavaSaveArgs(fileName)) } @@ -248,8 +252,16 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot * * @param fileName file name * @param lineWidth hard wrap FASTA formatted sequence at line width, default 60 + * @param asSingleFile By default (false), writes file to disk as shards with + * one shard per partition. If true, we save the file to disk as a single + * file by merging the shards. + * @param disableFastConcat If asSingleFile is true, disables the use of the + * parallel file merging engine. */ - def saveAsFasta(fileName: String, lineWidth: Int = 60) { + def saveAsFasta(fileName: String, + lineWidth: Int = 60, + asSingleFile: Boolean = false, + disableFastConcat: Boolean = false) { def isFragment(record: NucleotideContigFragment): Boolean = { Option(record.getIndex).isDefined && Option(record.getFragments).fold(false)(_ > 1) @@ -270,7 +282,12 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot sb.toString } - rdd.map(toFasta).saveAsTextFile(fileName) + val asFasta = rdd.map(toFasta) + + writeTextRdd(asFasta, + fileName, + asSingleFile, + disableFastConcat) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala index f46c4b5ea4..3a2f334e43 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureRDD.scala @@ -434,43 +434,6 @@ sealed abstract class FeatureRDD extends AvroGenomicRDD[Feature, FeatureProduct, Seq(ReferenceRegion.unstranded(elem)) } - /** - * Writes an RDD to disk as text and optionally merges. - * - * @param rdd RDD to save. - * @param outputPath Output path to save text files to. - * @param asSingleFile If true, combines all partition shards. - * @param disableFastConcat If asSingleFile is true, disables the use of the - * parallel file merging engine. - * @param optHeaderPath If provided, the header file to include. - */ - private def writeTextRdd[T](rdd: RDD[T], - outputPath: String, - asSingleFile: Boolean, - disableFastConcat: Boolean, - optHeaderPath: Option[String] = None) { - if (asSingleFile) { - - // write rdd to disk - val tailPath = "%s_tail".format(outputPath) - rdd.saveAsTextFile(tailPath) - - // get the filesystem impl - val fs = FileSystem.get(rdd.context.hadoopConfiguration) - - // and then merge - FileMerger.mergeFiles(rdd.context, - fs, - new Path(outputPath), - new Path(tailPath), - disableFastConcat = disableFastConcat, - optHeaderPath = optHeaderPath.map(p => new Path(p))) - } else { - assert(optHeaderPath.isEmpty) - rdd.saveAsTextFile(outputPath) - } - } - /** * Save this FeatureRDD in GTF format. * diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala index 259cc6a9e8..c333b9724c 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentRDDSuite.scala @@ -56,6 +56,14 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite { assert(fragments3.dataset.count === 8L) } + sparkTest("save fasta back as a single file") { + val origFasta = testFile("artificial.fa") + val tmpFasta = tmpFile("test.fa") + sc.loadFasta(origFasta) + .saveAsFasta(tmpFasta, asSingleFile = true, lineWidth = 70) + checkFiles(origFasta, tmpFasta) + } + sparkTest("generate sequence dict from fasta") { val contig1 = Contig.newBuilder