Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding loadPairedFastqAsFragments method #1828

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object Timers extends Metrics {
val LoadIndexedVcf = timer("Load indexed VCF format")
val LoadInterleavedFastq = timer("Load interleaved FASTQ format")
val LoadInterleavedFastqFragments = timer("Load interleaved FASTQ format as Fragments")
val LoadPairedFastqFragments = timer("Load paired FASTQ format as Fragments")
val LoadIntervalList = timer("Load IntervalList format")
val LoadNarrowPeak = timer("Load NarrowPeak format")
val LoadPairedFastq = timer("Load paired FASTQ format")
Expand Down
52 changes: 52 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,58 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
FragmentRDD.fromRdd(records.map(fastqRecordConverter.convertFragment))
}

/**
* Load paired unaligned alignment records grouped by sequencing fragment
* from paired FASTQ files into an FragmentRDD.
*
* In paired FASTQ, the two reads from a paired sequencing protocol in separated files.
*
* Fragments represent all of the reads from a single sequenced fragment as
* a single object, which is a useful representation for some tasks.
*
* @param pathNameR1 The path name to load unaligned alignment records from.
* Globs/directories are supported.
* @param pathNameR2 The path name to load unaligned alignment records from.
* Globs/directories are supported.
* @return Returns a FragmentRDD containing the paired reads grouped by
* sequencing fragment.
*/
def loadPairedFastqAsFragments(pathNameR1: String,
pathNameR2: String,
recordGroup: RecordGroup): FragmentRDD = LoadPairedFastqFragments.time {

def readFastq(path: String) = {
val job = HadoopUtil.newJob(sc)
val conf = ContextUtil.getConfiguration(job)
conf.setStrings("io.compression.codecs",
classOf[BGZFCodec].getCanonicalName,
classOf[BGZFEnhancedGzipCodec].getCanonicalName)
val file = sc.newAPIHadoopFile(
path,
classOf[SingleFastqInputFormat],
classOf[Void],
classOf[Text],
conf
)
if (Metrics.isRecording) file.instrument()
else file
}
val recordsR1 = readFastq(pathNameR1)
val recordsR2 = readFastq(pathNameR2)

// convert records
val fastqRecordConverter = new FastqRecordConverter
// Zip will fail if R1 and R2 has an different number of reads
// Checking this explicitly like in loadPairedFastq is not required and not blocking anymore
val rdd = recordsR1.zip(recordsR2)
.map {
case (r1, r2) =>
val pairText = new Text(r1._2.toString + r2._2.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note zipping the records together assumes that they are sorted in the same order in each file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, this should always the case when a pair of fastq files. When this is not the case the fastq files are corrupt, at least as paired reads.
Adding a check add more computation, seems like a waste

fastqRecordConverter.convertFragment((null, pairText))
}
FragmentRDD(rdd, SequenceDictionary.empty, RecordGroupDictionary(Seq(recordGroup)), Seq.empty)
}

/**
* Load features into a FeatureRDD and convert to a CoverageRDD.
* Coverage is stored in the score field of Feature.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ class ADAMContextSuite extends ADAMFunSuite {
})
}

sparkTest("load paired fastq as fragments") {
val pathR1 = testFile("proper_pairs_1.fq")
val pathR2 = testFile("proper_pairs_2.fq")
val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2)
assert(fragments.rdd.count === 3)
}

(1 to 4) foreach { testNumber =>
val inputName = "interleaved_fastq_sample%d.ifq".format(testNumber)
val path = testFile(inputName)
Expand Down