Skip to content

Commit

Permalink
[ADAM-1438] Add ability to save FASTA back as a single file.
Browse files Browse the repository at this point in the history
Resolves #1438. To support saving FASTA as a single file, moved the
private `writeTextRdd` method from the `FeatureRDD` to `GenomicRDD`, and opened
protections to `protected`.
  • Loading branch information
fnothaft authored and heuermh committed Jul 11, 2017
1 parent 7a11d9e commit 467db1f
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static NucleotideContigFragmentRDD conduit(final NucleotideContigFragment
// make temp directory and save file
Path tempDir = Files.createTempDirectory("javaAC");
String fileName = tempDir.toString() + "/testRdd.contig.adam";
recordRdd.save(fileName);
recordRdd.save(fileName, true);

// create a new adam context and load the file
JavaADAMContext jac = new JavaADAMContext(ac);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine }
import java.nio.file.Paths
import htsjdk.samtools.ValidationStringency
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
Expand Down Expand Up @@ -1212,6 +1212,43 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
Iterator(Some((firstRegion._1, lastRegion._1)))
}
}

/**
* Writes an RDD to disk as text and optionally merges.
*
* @param rdd RDD to save.
* @param outputPath Output path to save text files to.
* @param asSingleFile If true, combines all partition shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
* @param optHeaderPath If provided, the header file to include.
*/
protected def writeTextRdd[T](rdd: RDD[T],
outputPath: String,
asSingleFile: Boolean,
disableFastConcat: Boolean,
optHeaderPath: Option[String] = None) {
if (asSingleFile) {

// write rdd to disk
val tailPath = "%s_tail".format(outputPath)
rdd.saveAsTextFile(tailPath)

// get the filesystem impl
val fs = FileSystem.get(rdd.context.hadoopConfiguration)

// and then merge
FileMerger.mergeFiles(rdd.context,
fs,
new Path(outputPath),
new Path(tailPath),
disableFastConcat = disableFastConcat,
optHeaderPath = optHeaderPath.map(p => new Path(p)))
} else {
assert(optHeaderPath.isEmpty)
rdd.saveAsTextFile(outputPath)
}
}
}

private case class GenericGenomicRDD[T](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,14 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot
* to Parquet. Defaults to 60 character line length, if saving to FASTA.
*
* @param fileName file name
* @param asSingleFile If false, writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
*/
def save(fileName: java.lang.String) {
def save(fileName: java.lang.String,
asSingleFile: java.lang.Boolean) {
if (fileName.endsWith(".fa") || fileName.endsWith(".fasta")) {
saveAsFasta(fileName)
saveAsFasta(fileName, asSingleFile = asSingleFile)
} else {
saveAsParquet(new JavaSaveArgs(fileName))
}
Expand All @@ -248,8 +252,16 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot
*
* @param fileName file name
* @param lineWidth hard wrap FASTA formatted sequence at line width, default 60
* @param asSingleFile By default (false), writes file to disk as shards with
* one shard per partition. If true, we save the file to disk as a single
* file by merging the shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
*/
def saveAsFasta(fileName: String, lineWidth: Int = 60) {
def saveAsFasta(fileName: String,
lineWidth: Int = 60,
asSingleFile: Boolean = false,
disableFastConcat: Boolean = false) {

def isFragment(record: NucleotideContigFragment): Boolean = {
Option(record.getIndex).isDefined && Option(record.getFragments).fold(false)(_ > 1)
Expand All @@ -270,7 +282,12 @@ sealed abstract class NucleotideContigFragmentRDD extends AvroGenomicRDD[Nucleot
sb.toString
}

rdd.map(toFasta).saveAsTextFile(fileName)
val asFasta = rdd.map(toFasta)

writeTextRdd(asFasta,
fileName,
asSingleFile,
disableFastConcat)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,43 +434,6 @@ sealed abstract class FeatureRDD extends AvroGenomicRDD[Feature, FeatureProduct,
Seq(ReferenceRegion.unstranded(elem))
}

/**
* Writes an RDD to disk as text and optionally merges.
*
* @param rdd RDD to save.
* @param outputPath Output path to save text files to.
* @param asSingleFile If true, combines all partition shards.
* @param disableFastConcat If asSingleFile is true, disables the use of the
* parallel file merging engine.
* @param optHeaderPath If provided, the header file to include.
*/
private def writeTextRdd[T](rdd: RDD[T],
outputPath: String,
asSingleFile: Boolean,
disableFastConcat: Boolean,
optHeaderPath: Option[String] = None) {
if (asSingleFile) {

// write rdd to disk
val tailPath = "%s_tail".format(outputPath)
rdd.saveAsTextFile(tailPath)

// get the filesystem impl
val fs = FileSystem.get(rdd.context.hadoopConfiguration)

// and then merge
FileMerger.mergeFiles(rdd.context,
fs,
new Path(outputPath),
new Path(tailPath),
disableFastConcat = disableFastConcat,
optHeaderPath = optHeaderPath.map(p => new Path(p)))
} else {
assert(optHeaderPath.isEmpty)
rdd.saveAsTextFile(outputPath)
}
}

/**
* Save this FeatureRDD in GTF format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ class NucleotideContigFragmentRDDSuite extends ADAMFunSuite {
assert(fragments3.dataset.count === 8L)
}

sparkTest("save fasta back as a single file") {
val origFasta = testFile("artificial.fa")
val tmpFasta = tmpFile("test.fa")
sc.loadFasta(origFasta)
.saveAsFasta(tmpFasta, asSingleFile = true, lineWidth = 70)
checkFiles(origFasta, tmpFasta)
}

sparkTest("generate sequence dict from fasta") {

val contig1 = Contig.newBuilder
Expand Down

0 comments on commit 467db1f

Please sign in to comment.