Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-405] Add FASTQ output. #408

Merged
merged 1 commit into from
Oct 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class TransformArgs extends Args4jBase with ParquetArgs {
var repartition: Int = -1
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false
}

class Transform(protected val args: TransformArgs) extends ADAMSparkCommand[TransformArgs] with Logging {
Expand Down Expand Up @@ -133,11 +135,14 @@ class Transform(protected val args: TransformArgs) extends ADAMSparkCommand[Tran
} else if (args.outputPath.endsWith(".bam")) {
log.info("Saving data in BAM format")
adamRecords.adamSAMSave(args.outputPath, asSam = false)
} else if (args.outputPath.endsWith(".fq") || args.outputPath.endsWith(".fastq") ||
args.outputPath.endsWith(".ifq")) {
log.info("Saving data in FASTQ format.")
adamRecords.adamSaveAsFastq(args.outputPath, args.sortFastqOutput)
} else {
log.info("Saving data in ADAM format")
adamRecords.adamSave(args.outputPath, blockSize = args.blockSize, pageSize = args.pageSize,
compressCodec = args.compressionCodec, disableDictionaryEncoding = args.disableDictionary)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ import org.bdgenomics.formats.avro.AlignmentRecord

class AlignmentRecordConverter extends Serializable {

/**
* Converts a single record to FASTQ. FASTQ format is:
*
* @readName
* sequence
* +<optional readname>
* ASCII quality scores
*
* @param adamRecord Read to convert to FASTQ.
* @return Returns this read in string form.
*/
def convertToFastq(adamRecord: AlignmentRecord): String = {
"@" + adamRecord.getReadName + "\n" +
adamRecord.getSequence + "\n" +
"+\n" +
adamRecord.getQual
}

/**
* Converts a single ADAM record into a SAM record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,26 @@ class ADAMAlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) extends ADAMSeq
def adamTrimLowQualityReadGroups(phredThreshold: Int = 20): RDD[AlignmentRecord] = {
TrimReads(rdd, phredThreshold)
}

/**
* Saves reads in FASTQ format.
*
* @param fileName Path to save files at.
* @param sort Whether to sort the FASTQ files by read name or not. Defaults
* to false. Sorting the output will recover pair order, if desired.
*/
def adamSaveAsFastq(fileName: String, sort: Boolean = false) {
val arc = new AlignmentRecordConverter

// sort the rdd if desired
val outputRdd = if (sort) {
rdd.sortBy(_.getReadName.toString)
} else {
rdd
}

// convert the rdd and save as a text file
outputRdd.map(arc.convertToFastq)
.saveAsTextFile(fileName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import org.scalatest.FunSuite

class AlignmentRecordConverterSuite extends FunSuite {

// allocate converters
val adamRecordConverter = new AlignmentRecordConverter

def make_read(start: Long, cigar: String, mdtag: String, length: Int, id: Int = 0): AlignmentRecord = {
val sequence: String = "A" * length
AlignmentRecord.newBuilder()
Expand Down Expand Up @@ -63,9 +66,6 @@ class AlignmentRecordConverterSuite extends FunSuite {
val dict = SequenceDictionary(seqRecForDict)
val readGroups = new RecordGroupDictionary(Seq())

// allocate converters
val adamRecordConverter = new AlignmentRecordConverter

// convert read
val toSAM = adamRecordConverter.convert(adamRead,
SAMFileHeaderWritable(adamRecordConverter.createSAMHeader(dict,
Expand All @@ -84,5 +84,21 @@ class AlignmentRecordConverterSuite extends FunSuite {
assert(toSAM.getAttribute("MD") === "2^AAA2")
}

test("convert a read to fastq") {
val adamRead = AlignmentRecord.newBuilder()
.setSequence("ACACCAACATG")
.setQual(".+**.+;:**.")
.setReadName("thebestread")
.build()

val fastq = adamRecordConverter.convertToFastq(adamRead)
.toString
.split('\n')

assert(fastq(0) === "@thebestread")
assert(fastq(1) === "ACACCAACATG")
assert(fastq(2) === "+")
assert(fastq(3) === ".+**.+;:**.")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,27 @@ class ADAMAlignmentRecordRDDFunctionsSuite extends SparkFunSuite {

sam.collect().foreach(r => assert(r.getReadMapped))
}

sparkTest("round trip from ADAM to FASTQ and back to ADAM produces equivalent Read values") {
val reads12Path = Thread.currentThread().getContextClassLoader.getResource("interleaved_fastq_sample1.fq").getFile
val rdd12A: RDD[AlignmentRecord] = sc.adamLoad(reads12Path)

val tempFile = Files.createTempDirectory("reads12")
rdd12A.adamSaveAsFastq(tempFile.toAbsolutePath.toString + "/reads12.fq")

val rdd12B: RDD[AlignmentRecord] = sc.adamLoad(tempFile.toAbsolutePath.toString + "/reads12.fq")

assert(rdd12B.count() === rdd12A.count())

val reads12A = rdd12A.collect()
val reads12B = rdd12B.collect()

(0 until reads12A.length) foreach {
case i: Int =>
val (readA, readB) = (reads12A(i), reads12B(i))
assert(readA.getSequence === readB.getSequence)
assert(readA.getQual === readB.getQual)
assert(readA.getReadName === readB.getReadName)
}
}
}