From e7d31c8f1024050372ceaa847c0ca8ba52bab36a Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Tue, 12 Dec 2017 20:22:49 +0100 Subject: [PATCH 1/7] Add init version of loadPairedFastqAsFragments method --- .../adam/instrumentation/Timers.scala | 1 + .../org/bdgenomics/adam/rdd/ADAMContext.scala | 50 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index 0ce4a879cb..5890abc2bc 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -46,6 +46,7 @@ object Timers extends Metrics { val LoadIndexedVcf = timer("Load indexed VCF format") val LoadInterleavedFastq = timer("Load interleaved FASTQ format") val LoadInterleavedFastqFragments = timer("Load interleaved FASTQ format as Fragments") + val LoadPairedFastqFragments = timer("Load paired FASTQ format as Fragments") val LoadIntervalList = timer("Load IntervalList format") val LoadNarrowPeak = timer("Load NarrowPeak format") val LoadPairedFastq = timer("Load paired FASTQ format") diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index cf97109b26..9d21a90cfe 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2306,6 +2306,56 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log FragmentRDD.fromRdd(records.map(fastqRecordConverter.convertFragment)) } + /** + * Load paired unaligned alignment records grouped by sequencing fragment + * from paired FASTQ files into an FragmentRDD. + * + * In paired FASTQ, the two reads from a paired sequencing protocol in separated files. + * + * Fragments represent all of the reads from a single sequenced fragment as + * a single object, which is a useful representation for some tasks. + * + * @param pathNameR1 The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @param pathNameR2 The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @return Returns a FragmentRDD containing the paired reads grouped by + * sequencing fragment. + */ + def loadPairedFastqAsFragments(pathNameR1: String, pathNameR2: String): FragmentRDD = LoadPairedFastqFragments.time { + + val job = HadoopUtil.newJob(sc) + val conf = ContextUtil.getConfiguration(job) + conf.setStrings("io.compression.codecs", + classOf[BGZFCodec].getCanonicalName, + classOf[BGZFEnhancedGzipCodec].getCanonicalName) + val recordsR1 = sc.newAPIHadoopFile( + pathNameR1, + classOf[InterleavedFastqInputFormat], + classOf[Void], + classOf[Text], + conf + ) + if (Metrics.isRecording) recordsR1.instrument() else recordsR1 + val recordsR2 = sc.newAPIHadoopFile( + pathNameR2, + classOf[InterleavedFastqInputFormat], + classOf[Void], + classOf[Text], + conf + ) + if (Metrics.isRecording) recordsR2.instrument() else recordsR2 + + // convert records + val fastqRecordConverter = new FastqRecordConverter + val rdd = recordsR1.zip(recordsR2) + .flatMap { case (r1, r2) => + //TODO: check read ID's + List(fastqRecordConverter.convertFragment(r1), fastqRecordConverter.convertFragment(r2)) + } + FragmentRDD.fromRdd(rdd) + } + /** * Load features into a FeatureRDD and convert to a CoverageRDD. * Coverage is stored in the score field of Feature. From 10d04282a137f2b332031caa5058351c8f8de32b Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Tue, 12 Dec 2017 20:43:58 +0100 Subject: [PATCH 2/7] Added Validation method when files are not in sync --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 9d21a90cfe..1742d29175 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2307,22 +2307,24 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log } /** - * Load paired unaligned alignment records grouped by sequencing fragment - * from paired FASTQ files into an FragmentRDD. - * - * In paired FASTQ, the two reads from a paired sequencing protocol in separated files. - * - * Fragments represent all of the reads from a single sequenced fragment as - * a single object, which is a useful representation for some tasks. - * - * @param pathNameR1 The path name to load unaligned alignment records from. - * Globs/directories are supported. - * @param pathNameR2 The path name to load unaligned alignment records from. - * Globs/directories are supported. - * @return Returns a FragmentRDD containing the paired reads grouped by - * sequencing fragment. - */ - def loadPairedFastqAsFragments(pathNameR1: String, pathNameR2: String): FragmentRDD = LoadPairedFastqFragments.time { + * Load paired unaligned alignment records grouped by sequencing fragment + * from paired FASTQ files into an FragmentRDD. + * + * In paired FASTQ, the two reads from a paired sequencing protocol in separated files. + * + * Fragments represent all of the reads from a single sequenced fragment as + * a single object, which is a useful representation for some tasks. + * + * @param pathNameR1 The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @param pathNameR2 The path name to load unaligned alignment records from. + * Globs/directories are supported. + * @return Returns a FragmentRDD containing the paired reads grouped by + * sequencing fragment. + */ + def loadPairedFastqAsFragments(pathNameR1: String, + pathNameR2: String, + stringency: ValidationStringency = ValidationStringency.STRICT): FragmentRDD = LoadPairedFastqFragments.time { val job = HadoopUtil.newJob(sc) val conf = ContextUtil.getConfiguration(job) @@ -2349,9 +2351,22 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert records val fastqRecordConverter = new FastqRecordConverter val rdd = recordsR1.zip(recordsR2) - .flatMap { case (r1, r2) => - //TODO: check read ID's - List(fastqRecordConverter.convertFragment(r1), fastqRecordConverter.convertFragment(r2)) + .flatMap { + case (r1, r2) => + val r1Fragment = fastqRecordConverter.convertFragment(r1) + val r2Fragment = fastqRecordConverter.convertFragment(r2) + stringency match { + case ValidationStringency.STRICT | ValidationStringency.LENIENT => + val r1Name = r1Fragment.getReadName.split(" ").head.stripSuffix("/1") + val r2Name = r2Fragment.getReadName.split(" ").head.stripSuffix("/2") + if (r1Name != r2Name) { + val msg = s"Fastq 1 ($pathNameR1) and fastq 2 ($pathNameR2) are not in sync, order of the reads should be the same" + if (stringency == ValidationStringency.STRICT) + throw new IllegalArgumentException(msg) + else logError(msg) + } + } + List(r1Fragment, r2Fragment) } FragmentRDD.fromRdd(rdd) } From c1c5c5cf530f5330048380498cc8d31c50995334 Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Tue, 12 Dec 2017 20:54:38 +0100 Subject: [PATCH 3/7] Replace head --- .../src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 1742d29175..469012765e 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2357,8 +2357,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log val r2Fragment = fastqRecordConverter.convertFragment(r2) stringency match { case ValidationStringency.STRICT | ValidationStringency.LENIENT => - val r1Name = r1Fragment.getReadName.split(" ").head.stripSuffix("/1") - val r2Name = r2Fragment.getReadName.split(" ").head.stripSuffix("/2") + val r1Name = r1Fragment.getReadName.takeWhile(_ != ' ').stripSuffix("/1") + val r2Name = r2Fragment.getReadName.takeWhile(_ != ' ').stripSuffix("/2") if (r1Name != r2Name) { val msg = s"Fastq 1 ($pathNameR1) and fastq 2 ($pathNameR2) are not in sync, order of the reads should be the same" if (stringency == ValidationStringency.STRICT) From eeb976be6f76ac29e212579283307c10bbea4302 Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Wed, 13 Dec 2017 09:31:00 +0100 Subject: [PATCH 4/7] Added stringency to scaladocs --- .../src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 469012765e..7910913816 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2319,6 +2319,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported. * @param pathNameR2 The path name to load unaligned alignment records from. * Globs/directories are supported. + * @param stringency The validation stringency to use when validating paired FASTQ format. + * Defaults to ValidationStringency.STRICT. * @return Returns a FragmentRDD containing the paired reads grouped by * sequencing fragment. */ From 118d94d40957981a50ac319154f767f6d911900a Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Wed, 13 Dec 2017 09:49:08 +0100 Subject: [PATCH 5/7] Added a test --- .../scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index 2dccfef85f..21aa455fbc 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -271,6 +271,13 @@ class ADAMContextSuite extends ADAMFunSuite { }) } + sparkTest("load paired fastq as fragments") { + val pathR1 = testFile("proper_pairs_1.fq") + val pathR2 = testFile("proper_pairs_2.fq") + val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2) + assert(fragments.rdd.count === 6) + } + (1 to 4) foreach { testNumber => val inputName = "interleaved_fastq_sample%d.ifq".format(testNumber) val path = testFile(inputName) From 043dc276d9ef1583ea956e25f16feab5213cfc65 Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Wed, 13 Dec 2017 13:26:08 +0100 Subject: [PATCH 6/7] Simplify pair check (done by converter already) --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 7910913816..49c57c41c6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2353,22 +2353,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log // convert records val fastqRecordConverter = new FastqRecordConverter val rdd = recordsR1.zip(recordsR2) - .flatMap { + .map { case (r1, r2) => - val r1Fragment = fastqRecordConverter.convertFragment(r1) - val r2Fragment = fastqRecordConverter.convertFragment(r2) - stringency match { - case ValidationStringency.STRICT | ValidationStringency.LENIENT => - val r1Name = r1Fragment.getReadName.takeWhile(_ != ' ').stripSuffix("/1") - val r2Name = r2Fragment.getReadName.takeWhile(_ != ' ').stripSuffix("/2") - if (r1Name != r2Name) { - val msg = s"Fastq 1 ($pathNameR1) and fastq 2 ($pathNameR2) are not in sync, order of the reads should be the same" - if (stringency == ValidationStringency.STRICT) - throw new IllegalArgumentException(msg) - else logError(msg) - } - } - List(r1Fragment, r2Fragment) + val pairText = new Text(r1._2.toString + r2._2.toString) + fastqRecordConverter.convertFragment((null, pairText)) } FragmentRDD.fromRdd(rdd) } From b363974f722714f8de7d2086cdb473d9a6e5966e Mon Sep 17 00:00:00 2001 From: Peter van 't Hof Date: Wed, 13 Dec 2017 15:53:33 +0100 Subject: [PATCH 7/7] Added readgroup --- .../org/bdgenomics/adam/rdd/ADAMContext.scala | 49 +++++++++---------- .../adam/rdd/ADAMContextSuite.scala | 2 +- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 49c57c41c6..2c91d58f2a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -2319,46 +2319,43 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported. * @param pathNameR2 The path name to load unaligned alignment records from. * Globs/directories are supported. - * @param stringency The validation stringency to use when validating paired FASTQ format. - * Defaults to ValidationStringency.STRICT. * @return Returns a FragmentRDD containing the paired reads grouped by * sequencing fragment. */ def loadPairedFastqAsFragments(pathNameR1: String, pathNameR2: String, - stringency: ValidationStringency = ValidationStringency.STRICT): FragmentRDD = LoadPairedFastqFragments.time { - - val job = HadoopUtil.newJob(sc) - val conf = ContextUtil.getConfiguration(job) - conf.setStrings("io.compression.codecs", - classOf[BGZFCodec].getCanonicalName, - classOf[BGZFEnhancedGzipCodec].getCanonicalName) - val recordsR1 = sc.newAPIHadoopFile( - pathNameR1, - classOf[InterleavedFastqInputFormat], - classOf[Void], - classOf[Text], - conf - ) - if (Metrics.isRecording) recordsR1.instrument() else recordsR1 - val recordsR2 = sc.newAPIHadoopFile( - pathNameR2, - classOf[InterleavedFastqInputFormat], - classOf[Void], - classOf[Text], - conf - ) - if (Metrics.isRecording) recordsR2.instrument() else recordsR2 + recordGroup: RecordGroup): FragmentRDD = LoadPairedFastqFragments.time { + + def readFastq(path: String) = { + val job = HadoopUtil.newJob(sc) + val conf = ContextUtil.getConfiguration(job) + conf.setStrings("io.compression.codecs", + classOf[BGZFCodec].getCanonicalName, + classOf[BGZFEnhancedGzipCodec].getCanonicalName) + val file = sc.newAPIHadoopFile( + path, + classOf[SingleFastqInputFormat], + classOf[Void], + classOf[Text], + conf + ) + if (Metrics.isRecording) file.instrument() + else file + } + val recordsR1 = readFastq(pathNameR1) + val recordsR2 = readFastq(pathNameR2) // convert records val fastqRecordConverter = new FastqRecordConverter + // Zip will fail if R1 and R2 has an different number of reads + // Checking this explicitly like in loadPairedFastq is not required and not blocking anymore val rdd = recordsR1.zip(recordsR2) .map { case (r1, r2) => val pairText = new Text(r1._2.toString + r2._2.toString) fastqRecordConverter.convertFragment((null, pairText)) } - FragmentRDD.fromRdd(rdd) + FragmentRDD(rdd, SequenceDictionary.empty, RecordGroupDictionary(Seq(recordGroup)), Seq.empty) } /** diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index 21aa455fbc..4c73d3de87 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -275,7 +275,7 @@ class ADAMContextSuite extends ADAMFunSuite { val pathR1 = testFile("proper_pairs_1.fq") val pathR2 = testFile("proper_pairs_2.fq") val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2) - assert(fragments.rdd.count === 6) + assert(fragments.rdd.count === 3) } (1 to 4) foreach { testNumber =>