forked from bigdatagenomics/adam
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ADAM-1359] Merge
reads2fragments
and fragments2reads
into `trans…
…formFragments` Resolves bigdatagenomics#1359. Also, propegates the quality score binner (bigdatagenomics#1485) up to the `transformFragments` CLI.
- Loading branch information
Showing
8 changed files
with
234 additions
and
205 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 0 additions & 91 deletions
91
adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fragments2Reads.scala
This file was deleted.
Oops, something went wrong.
57 changes: 0 additions & 57 deletions
57
adam-cli/src/main/scala/org/bdgenomics/adam/cli/Reads2Fragments.scala
This file was deleted.
Oops, something went wrong.
140 changes: 140 additions & 0 deletions
140
adam-cli/src/main/scala/org/bdgenomics/adam/cli/TransformFragments.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/** | ||
* Licensed to Big Data Genomics (BDG) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The BDG licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.bdgenomics.adam.cli | ||
|
||
import org.apache.spark.SparkContext | ||
import org.bdgenomics.adam.rdd.ADAMContext._ | ||
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs | ||
import org.bdgenomics.adam.rdd.read.QualityScoreBin | ||
import org.bdgenomics.adam.rdd.fragment.FragmentRDD | ||
import org.bdgenomics.utils.cli._ | ||
import org.bdgenomics.utils.misc.Logging | ||
import org.kohsuke.args4j.{ Argument, Option => Args4jOption } | ||
|
||
object TransformFragments extends BDGCommandCompanion { | ||
val commandName = "transformFragments" | ||
val commandDescription = "Convert alignment records into fragment records." | ||
|
||
def apply(cmdLine: Array[String]) = { | ||
new TransformFragments(Args4j[TransformFragmentsArgs](cmdLine)) | ||
} | ||
} | ||
|
||
class TransformFragmentsArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { | ||
@Argument(required = true, metaVar = "INPUT", usage = "The Fragment file to apply the transforms to", index = 0) | ||
var inputPath: String = null | ||
@Argument(required = true, metaVar = "OUTPUT", usage = "Location to write the transformed fragments", index = 1) | ||
var outputPath: String = null | ||
@Args4jOption(required = false, name = "-load_as_reads", usage = "Treats the input data as reads") | ||
var loadAsReads: Boolean = false | ||
@Args4jOption(required = false, name = "-save_as_reads", usage = "Saves the output data as reads") | ||
var saveAsReads: Boolean = false | ||
@Args4jOption(required = false, name = "-single", usage = "Saves OUTPUT as single file") | ||
var asSingleFile: Boolean = false | ||
@Args4jOption(required = false, name = "-sort_reads", usage = "Sort the reads by referenceId and read position. Only valid if run with -save_as_reads") | ||
var sortReads: Boolean = false | ||
@Args4jOption(required = false, name = "-defer_merging", usage = "Defers merging single file output") | ||
var deferMerging: Boolean = false | ||
@Args4jOption(required = false, name = "-disable_fast_concat", usage = "Disables the parallel file concatenation engine.") | ||
var disableFastConcat: Boolean = false | ||
@Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.") | ||
var sortLexicographically: Boolean = false | ||
@Args4jOption(required = false, name = "-mark_duplicate_reads", usage = "Mark duplicate reads") | ||
var markDuplicates: Boolean = false | ||
@Args4jOption(required = false, name = "-bin_quality_scores", usage = "Rewrites quality scores of reads into bins from a string of bin descriptions, e.g. 0,20,10;20,40,30.") | ||
var binQualityScores: String = null | ||
|
||
// this is required because of the ADAMSaveAnyArgs trait... fix this trait??? | ||
var sortFastqOutput = false | ||
} | ||
|
||
class TransformFragments(protected val args: TransformFragmentsArgs) extends BDGSparkCommand[TransformFragmentsArgs] with Logging { | ||
val companion = TransformFragments | ||
|
||
/** | ||
* @param reads An RDD of fragments. | ||
* @return If the mark duplicates argument is sent, deduplicates the reads. | ||
* Else, returns the input reads. | ||
*/ | ||
def maybeDedupe(reads: FragmentRDD): FragmentRDD = { | ||
if (args.markDuplicates) { | ||
reads.markDuplicates() | ||
} else { | ||
reads | ||
} | ||
} | ||
|
||
/** | ||
* @param rdd An RDD of fragments. | ||
* @return If the binQualityScores argument is set, rewrites the quality scores of the | ||
* reads into bins. Else, returns the original RDD. | ||
*/ | ||
private def maybeBin(rdd: FragmentRDD): FragmentRDD = { | ||
Option(args.binQualityScores).fold(rdd)(binDescription => { | ||
val bins = QualityScoreBin(binDescription) | ||
rdd.binQualityScores(bins) | ||
}) | ||
} | ||
|
||
def run(sc: SparkContext) { | ||
if (args.loadAsReads && args.saveAsReads) { | ||
log.warn("If loading and saving as reads, consider using TransformAlignments instead.") | ||
} | ||
if (args.sortReads) { | ||
require(args.saveAsReads, | ||
"-sort_reads is only valid if -save_as_reads is given.") | ||
} | ||
if (args.sortLexicographically) { | ||
require(args.saveAsReads, | ||
"-sort_lexicographically is only valid if -save_as_reads is given.") | ||
} | ||
|
||
val rdd = if (args.loadAsReads) { | ||
sc.loadAlignments(args.inputPath) | ||
.toFragments | ||
} else { | ||
sc.loadFragments(args.inputPath) | ||
} | ||
|
||
// should we bin the quality scores? | ||
val maybeBinnedReads = maybeBin(rdd) | ||
|
||
// should we dedupe the reads? | ||
val maybeDedupedReads = maybeDedupe(maybeBinnedReads) | ||
|
||
if (args.saveAsReads) { | ||
// save rdd as reads | ||
val readRdd = maybeDedupedReads.toReads | ||
|
||
// prep to save | ||
val finalRdd = if (args.sortReads) { | ||
readRdd.sortReadsByReferencePosition() | ||
} else if (args.sortLexicographically) { | ||
readRdd.sortReadsByReferencePositionAndIndex() | ||
} else { | ||
readRdd | ||
} | ||
|
||
// save the file | ||
finalRdd.save(args, | ||
isSorted = args.sortReads || args.sortLexicographically) | ||
} else { | ||
maybeDedupedReads.saveAsParquet(args) | ||
} | ||
} | ||
} |
35 changes: 0 additions & 35 deletions
35
adam-cli/src/test/scala/org/bdgenomics/adam/cli/Reads2FragmentsSuite.scala
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.