diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala index c940319179..affbb7b237 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAMMain.scala @@ -32,8 +32,6 @@ object ADAMMain extends Logging { CommandGroup( "ADAM ACTIONS", List( - CompareADAM, - FindReads, CalculateDepth, CountKmers, CountContigKmers, @@ -66,7 +64,6 @@ object ADAMMain extends Logging { VizReads, PrintTags, ListDict, - SummarizeGenotypes, AlleleCount, BuildInformation, View diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CompareADAM.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CompareADAM.scala deleted file mode 100644 index 99d03db055..0000000000 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CompareADAM.scala +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.cli - -import java.io.{ PrintWriter, OutputStreamWriter } -import org.apache.hadoop.fs.{ Path, FileSystem } -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.metrics.{ DefaultComparisons, CombinedComparisons, BucketComparisons } -import org.bdgenomics.adam.projections.AlignmentRecordField._ -import org.bdgenomics.adam.projections.FieldValue -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.read.comparisons.ComparisonTraversalEngine -import org.bdgenomics.adam.util._ -import org.bdgenomics.utils.metrics.{ Collection, Histogram } -import org.bdgenomics.utils.metrics.aggregators.{ HistogramAggregator, CombinedAggregator, AggregatedCollection, Writable } -import org.kohsuke.args4j.{ Option => Args4jOption, Argument, CmdLineParser } -import scala.collection.Seq - -/** - * CompareADAM is a tool for pairwise comparison of ADAM files (or merged sets of ADAM files, see the - * note on the -recurse{1,2} optional parameters, below). - * - * The canonical use-case for CompareADAM involves a single input file run through (for example) two - * different implementations of the same pipeline, producing two comparable ADAM files at the end. - * - * CompareADAM will load these ADAM files and perform a read-name-based equi-join. It then computes - * one or more metrics (embodied as BucketComparisons values) across the joined records, as specified - * on the command-line, and aggregates each metric into a histogram (although, this can be modified if - * other aggregations are required in the future) and outputs the resulting histograms to a specified - * directory as text files. - * - * There is an R script in the adam-scripts module to process those outputs into a figure. - * - * The available metrics to be calculated are defined, by name, in the DefaultComparisons object. - * - * A subsequent tool like FindReads can be used to track down which reads give rise to particular aggregated - * bins in the output histograms, if further diagnosis is needed. - */ -object CompareADAM extends ADAMCommandCompanion with Serializable { - - val commandName: String = "compare" - val commandDescription: String = "Compare two ADAM files based on read name" - - def apply(cmdLine: Array[String]): ADAMCommand = { - new CompareADAM(Args4j[CompareADAMArgs](cmdLine)) - } - - type GeneratedResults[A] = RDD[(String, Seq[A])] - - /** - * @see CompareADAMArgs.recurse1, CompareADAMArgs.recurse2 - */ - def setupTraversalEngine(sc: SparkContext, - input1Path: String, - recurse1: String, - input2Path: String, - recurse2: String, - generator: BucketComparisons[Any]): ComparisonTraversalEngine = { - - val schemas = Seq[FieldValue]( - recordGroupId, - readName, - readMapped, - primaryAlignment, - readPaired, - firstOfPair) ++ generator.schemas - - new ComparisonTraversalEngine(schemas, sc.findFiles(new Path(input1Path), recurse1), sc.findFiles(new Path(input2Path), recurse2))(sc) - } - - def parseGenerators(nameList: String): Seq[BucketComparisons[Any]] = { - nameList match { - case null => DefaultComparisons.comparisons - case s => parseGenerators(s.split(",")) - } - } - - def parseGenerators(names: Seq[String]): Seq[BucketComparisons[Any]] = - names.map(DefaultComparisons.findComparison) -} - -class CompareADAMArgs extends Args4jBase with ParquetArgs with Serializable { - - @Argument(required = false, metaVar = "INPUT1", usage = "The first ADAM file to compare", index = 0) - val input1Path: String = null - - @Argument(required = false, metaVar = "INPUT2", usage = "The second ADAM file to compare", index = 1) - val input2Path: String = null - - @Args4jOption(required = false, name = "-recurse1", metaVar = "REGEX", - usage = "Optional regex; if specified, INPUT1 is recursively searched for directories matching this " + - "pattern, whose contents are loaded and merged prior to running the comparison") - val recurse1: String = null - - @Args4jOption(required = false, name = "-recurse2", metaVar = "REGEX", - usage = "Optional regex; if specified, INPUT2 is recursively searched for directories matching this " + - "pattern, whose contents are loaded and merged prior to running the comparison") - val recurse2: String = null - - @Args4jOption(required = false, name = "-comparisons", usage = "Comma-separated list of comparisons to run") - val comparisons: String = null - - @Args4jOption(required = false, name = "-list_comparisons", - usage = "If specified, lists all the comparisons that are available") - val listComparisons: Boolean = false - - @Args4jOption(required = false, name = "-output", metaVar = "DIRECTORY", - usage = "Directory to generate the comparison output files (default: output to STDOUT)") - val directory: String = null -} - -class CompareADAM(protected val args: CompareADAMArgs) extends ADAMSparkCommand[CompareADAMArgs] with Serializable { - - val companion: ADAMCommandCompanion = CompareADAM - - /** - * prints out a high-level summary of the compared files, including - * - * - the number of reads in each file - * - * - the number of reads which were unique to each file (i.e. not found in the other file, by name) - * - * - for each generator / aggregation created, prints out the total number of reads that went into the - * aggregation as well as the total which represent "identity" between the compared files (right now, - * this second number really only makes sense for the Histogram aggregated value, but that's the only - * aggregator we're using anyway.) - * - * @param engine The ComparisonTraversalEngine used for the aggregated values - * @param writer The PrintWriter to print the summary with. - */ - def printSummary(engine: ComparisonTraversalEngine, - generators: Seq[BucketComparisons[Any]], - aggregateds: Seq[Histogram[Any]], - writer: PrintWriter) { - - writer.println("%15s: %s".format("INPUT1", args.input1Path)) - writer.println("\t%15s: %d".format("total-reads", engine.named1.count())) - writer.println("\t%15s: %d".format("unique-reads", engine.uniqueToNamed1())) - - writer.println("%15s: %s".format("INPUT2", args.input2Path)) - writer.println("\t%15s: %d".format("total-reads", engine.named2.count())) - writer.println("\t%15s: %d".format("unique-reads", engine.uniqueToNamed2())) - - for ((generator, aggregated) <- generators.zip(aggregateds)) { - writer.println() - writer.println(generator.name) - - val count = aggregated.count() - val countIdentity = aggregated.countIdentical() - val diffFrac = (count - countIdentity).toDouble / count.toDouble - - writer.println("\t%15s: %d".format("count", count)) - writer.println("\t%15s: %d".format("identity", countIdentity)) - writer.println("\t%15s: %.5f".format("diff%", 100.0 * diffFrac)) - } - } - - def run(sc: SparkContext, job: Job): Unit = { - - // Work around for -list_comparisons (doesn't require @Arguments) - if (!args.listComparisons) { - if (args.input1Path == null || args.input2Path == null) { - val options = new CompareADAMArgs - val parser = new CmdLineParser(options) - println("\n2 Arguments (files) must be given as input.\n") - parser.printUsage(System.err) - sys.exit(1) - } - } else { - println("\nAvailable comparisons:") - DefaultComparisons.comparisons.foreach { - generator => - println("\t%10s : %s".format(generator.name, generator.description)) - } - return - } - - val generators: Seq[BucketComparisons[Any]] = CompareADAM.parseGenerators(args.comparisons) - val aggregators = (0 until generators.size).map(i => new HistogramAggregator[Any]()) - - val generator = new CombinedComparisons(generators) - val aggregator = new CombinedAggregator(aggregators) - - // Separately constructing the traversal engine, since we'll need it to generate the aggregated values - // and the summary statistics separately. - val engine = CompareADAM.setupTraversalEngine(sc, - args.input1Path, - args.recurse1, - args.input2Path, - args.recurse2, - generator) - - // generate the raw values... - val generated: CompareADAM.GeneratedResults[Collection[Seq[Any]]] = engine.generate(generator) - - // ... and aggregate them. - val values: AggregatedCollection[Any, Histogram[Any]] = ComparisonTraversalEngine.combine(generated, aggregator) - - val aggValues = values.asInstanceOf[AggregatedCollection[Any, Histogram[Any]]].values - - if (args.directory != null) { - val fs = FileSystem.get(sc.hadoopConfiguration) - val nameOutput = new OutputStreamWriter(fs.create(new Path(args.directory, "files"))) - nameOutput.write(args.input1Path) - nameOutput.write("\n") - nameOutput.write(args.input2Path) - nameOutput.write("\n") - nameOutput.close() - - val summaryOutput = new PrintWriter(new OutputStreamWriter(fs.create(new Path(args.directory, "summary.txt")))) - printSummary(engine, generators, aggValues, summaryOutput) - summaryOutput.close() - - generators.zip(aggValues).foreach { - case (gen, value) => { - val output = new OutputStreamWriter(fs.create(new Path(args.directory, gen.name))) - value.asInstanceOf[Writable].write(output) - - output.close() - } - } - } else { - - // Just send everything to STDOUT if the directory isn't specified. - val writer = new PrintWriter(System.out) - - printSummary(engine, generators, aggValues, writer) - - generators.zip(aggValues).foreach { - case (gen, value) => - writer.println(gen.name) - writer.println(value) - } - } - } -} \ No newline at end of file diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FindReads.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FindReads.scala deleted file mode 100644 index dad92c359e..0000000000 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/FindReads.scala +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.cli - -import org.kohsuke.args4j.{ Option => Args4jOption, Argument } -import org.apache.spark.SparkContext -import org.apache.hadoop.mapreduce.Job -import scala.collection.Seq -import java.util.regex.Pattern -import org.apache.hadoop.fs.{ Path, FileSystem } -import java.io.OutputStreamWriter -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.ReadBucket -import org.bdgenomics.adam.metrics.{ DefaultComparisons, BucketComparisons } -import org.bdgenomics.adam.metrics.filters.{ CombinedFilter, GeneratorFilter } - -/** - * FindReads is an auxiliary command to CompareADAM -- whereas CompareADAM takes two ADAM files (which - * presumably contain the same reads, processed differently), joins them based on read-name, and computes - * aggregates of one or more metrics (BucketComparisons) across those joined reads -- FindReads performs - * the same join-and-metric-computation, but takes a second argument as well: a boolean condition on the - * value(s) of the metric(s) computed. FindReads then outputs just the names of the reads whose metric - * values(s) meet the given condition. - * - * So, for example, CompareADAM might be used to find out that the same reads processed through two different - * pipelines are aligned to different locations 3% of the time. - * - * FindReads would then allow you to output the names of those 3% of the reads which are aligned differently, - * using a filter expression like "positions!=0". - */ -object FindReads extends ADAMCommandCompanion { - val commandName: String = "findreads" - val commandDescription: String = "Find reads that match particular individual or comparative criteria" - - def apply(cmdLine: Array[String]): ADAMCommand = { - new FindReads(Args4j[FindReadsArgs](cmdLine)) - } - - private val filterRegex = Pattern.compile("([^!=<>]+)((!=|=|<|>).*)") - - /** - * A 'filter' is a pair of a BucketComparisons value and a boolean condition on the values output - * by the comparison. The BucketComparisons object is specified by name, and parsed by the - * findGenerator method in DefaultComparisons that maps strings to BucketComparisons objects. - * - * The boolean condition is specified by an operator { !=, =, <, > } and a value (which is - * an integer, floating point number, pair specified by the '(value,value)' notation, or boolean). - * - * For example, "positions!=0" would specify those reads whose corresponding distance between files - * was non-zero. - * - * Again, "dupemismatch=(1,0)" would specify those reads which are marked as a duplicate in the first - * file and _not_ in the second file. - * - * @param filterString A String conforming to the regex "[^^!=<>]+(!=|=|<|>).*" - * @return A GeneratorFilter object which contains both the BucketComparisons value and the filter condition. - */ - def parseFilter(filterString: String): GeneratorFilter[Any] = { - - val matcher = filterRegex.matcher(filterString) - if (!matcher.matches()) { throw new IllegalArgumentException(filterString) } - - val generator: BucketComparisons[Any] = DefaultComparisons.findComparison(matcher.group(1)) - val filterDef = matcher.group(2) - - generator.createFilter(filterDef) - } - - def parseFilters(filters: String): GeneratorFilter[Any] = - new CombinedFilter[Any](filters.split(";").map(parseFilter)) -} - -class FindReadsArgs extends Args4jBase with ParquetArgs with Serializable { - @Argument(required = true, metaVar = "INPUT1", usage = "The first ADAM file to compare", index = 0) - val input1Path: String = null - - @Argument(required = true, metaVar = "INPUT2", usage = "The second ADAM file to compare", index = 1) - val input2Path: String = null - - @Argument(required = true, metaVar = "FILTER", usage = "Filter to run", index = 2) - val filter: String = null - - @Args4jOption(required = false, name = "-recurse1", - usage = "Optional regex; if specified, INPUT1 is recursively searched for directories matching this " + - "pattern, whose contents are loaded and merged prior to running the comparison") - val recurse1: String = null - - @Args4jOption(required = false, name = "-recurse2", - usage = "Optional regex; if specified, INPUT2 is recursively searched for directories matching this " + - "pattern, whose contents are loaded and merged prior to running the comparison") - val recurse2: String = null - - @Args4jOption(required = false, name = "-file", usage = "File name to write the matching read names to") - val file: String = null -} - -class FindReads(protected val args: FindReadsArgs) extends ADAMSparkCommand[FindReadsArgs] with Serializable { - val companion: ADAMCommandCompanion = FindReads - - def run(sc: SparkContext, job: Job): Unit = { - - val filter = FindReads.parseFilters(args.filter) - val generator = filter.comparison - - val engine = CompareADAM.setupTraversalEngine(sc, args.input1Path, args.recurse1, args.input2Path, args.recurse2, generator) - - val generated: CompareADAM.GeneratedResults[Any] = engine.generate(generator) - - val filtered: RDD[(String, Seq[Any])] = generated.filter { - case (name: String, values: Seq[Any]) => - values.exists(filter.passesFilter) - } - - val filteredJoined = engine.joined.join(filtered).map { - case (name: String, ((bucket1: ReadBucket, bucket2: ReadBucket), generated: Seq[Any])) => { - val rec1 = bucket1.allReads().head - val rec2 = bucket2.allReads().head - (name, "%s:%d".format(rec1.getContig.getContigName, rec1.getStart), "%s:%d".format(rec2.getContig.getContigName, rec2.getStart)) - } - } - - if (args.file != null) { - val fs = FileSystem.get(sc.hadoopConfiguration) - val file = new OutputStreamWriter(fs.create(new Path(args.file))) - file.write(generator.name) - file.write("\n") - for (value <- filteredJoined.collect()) { - file.write(Seq(value._1, value._2, value._3).mkString("\t")) - file.write("\n") - } - } else { - // TODO generalize for files. - println(generator.name) - for (value <- filteredJoined.collect()) { - println(Seq(value._1, value._2, value._3).mkString("\t")) - } - } - - } - -} diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/GenotypeConcordance.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/GenotypeConcordance.scala deleted file mode 100644 index 25caa9ef8e..0000000000 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/GenotypeConcordance.scala +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.cli - -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{ SparkContext, Logging } -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.kohsuke.args4j.{ Argument, Option => Args4jOption } -import org.bdgenomics.adam.predicates.GenotypeRecordPASSPredicate -import org.bdgenomics.adam.projections.GenotypeField -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.variation.ConcordanceTable -import org.bdgenomics.formats.avro.Genotype - -object GenotypeConcordance extends ADAMCommandCompanion { - val commandName = "genotype_concordance" - val commandDescription = "Pairwise comparison of sets of ADAM genotypes" - - def apply(cmdLine: Array[String]) = { - new GenotypeConcordance(Args4j[GenotypeConcordanceArgs](cmdLine)) - } -} - -class GenotypeConcordanceArgs extends Args4jBase with ParquetArgs { - @Argument(required = true, metaVar = "test", usage = "The test ADAM genotypes file", index = 0) - var testGenotypesFile: String = _ - @Argument(required = true, metaVar = "truth", usage = "The truth ADAM genotypes file", index = 1) - var truthGenotypesFile: String = _ - @Args4jOption(required = false, name = "-include_non_pass", usage = "Include non-PASSing sites in concordance evaluation") - var includeNonPass: Boolean = false -} - -class GenotypeConcordance(protected val args: GenotypeConcordanceArgs) extends ADAMSparkCommand[GenotypeConcordanceArgs] with Logging { - val companion: ADAMCommandCompanion = GenotypeConcordance - - def run(sc: SparkContext, job: Job): Unit = { - // TODO: Figure out projections of nested fields - var project = List( - GenotypeField.variant, GenotypeField.sampleId, GenotypeField.alleles) - - val predicate = if (!args.includeNonPass) { - // We also need to project the filter field to use this predicate - // project :+= varIsFiltered - Some(classOf[GenotypeRecordPASSPredicate]) - } else - None - val projection = None //Some(Projection(project)) - - val testGTs: RDD[Genotype] = sc.adamLoad(args.testGenotypesFile, predicate, projection) - val truthGTs: RDD[Genotype] = sc.adamLoad(args.truthGenotypesFile, predicate, projection) - - val tables = testGTs.concordanceWith(truthGTs) - - // Write out results as a table - System.out.println("Sample\tConcordance\tNonReferenceSensitivity") - for ((sample, table) <- tables.collectAsMap()) { - System.out.println("%s\t%f\t%f".format(sample, table.concordance, table.nonReferenceSensitivity)) - } - { - val total = tables.values.fold(ConcordanceTable())((c1, c2) => c1.add(c2)) - System.out.println("ALL\t%f\t%f".format(total.concordance, total.nonReferenceSensitivity)) - } - - } -} diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SummarizeGenotypes.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SummarizeGenotypes.scala deleted file mode 100644 index 3bbfa394f2..0000000000 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SummarizeGenotypes.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.cli - -import org.apache.spark.rdd.RDD -import org.apache.spark.{ Logging, SparkContext } -import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.fs.{ Path, FileSystem } -import org.apache.hadoop.conf.Configuration -import org.bdgenomics.formats.avro.Genotype -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.variation.{ - GenotypesSummary, - GenotypesSummaryFormatting -} -import org.kohsuke.args4j -import java.io.{ OutputStreamWriter, BufferedWriter } - -object SummarizeGenotypes extends ADAMCommandCompanion { - - val commandName = "summarize_genotypes" - val commandDescription = "Print statistics of genotypes and variants in an ADAM file" - - def apply(cmdLine: Array[String]) = { - new SummarizeGenotypes(Args4j[SummarizeGenotypesArgs](cmdLine)) - } -} - -class SummarizeGenotypesArgs extends Args4jBase with ParquetArgs { - @args4j.Argument(required = true, metaVar = "ADAM", usage = "The ADAM genotypes file to print stats for", index = 0) - var adamFile: String = _ - - @args4j.Option(required = false, name = "-format", usage = "Format: one of human, csv. Default: human.") - var format: String = "human" - - @args4j.Option(required = false, name = "-out", usage = "Write output to the given file.") - var out: String = "" -} - -class SummarizeGenotypes(val args: SummarizeGenotypesArgs) extends ADAMSparkCommand[SummarizeGenotypesArgs] with Logging { - val companion = SummarizeGenotypes - - def run(sc: SparkContext, job: Job) { - val adamGTs: RDD[Genotype] = sc.adamLoad(args.adamFile) - val stats = GenotypesSummary(adamGTs) - val result = args.format match { - case "human" => GenotypesSummaryFormatting.format_human_readable(stats) - case "csv" => GenotypesSummaryFormatting.format_csv(stats) - case _ => throw new IllegalArgumentException("Invalid -format: %s".format(args.format)) - } - if (args.out.isEmpty) { - println(result) - } else { - val filesystem = FileSystem.get(new Configuration()) - val path = new Path(args.out) - val writer = new BufferedWriter(new OutputStreamWriter(filesystem.create(path, true))) - writer.write(result) - writer.close() - println("Wrote: %s".format(args.out)) - } - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/AvailableComparisons.scala b/adam-core/src/main/scala/org/bdgenomics/adam/metrics/AvailableComparisons.scala deleted file mode 100644 index a3c43a1955..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/AvailableComparisons.scala +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.metrics - -import org.bdgenomics.formats.avro.AlignmentRecord -import org.bdgenomics.adam.projections.FieldValue -import org.bdgenomics.adam.projections.AlignmentRecordField._ -import org.bdgenomics.adam.rich.RichAlignmentRecord._ -import org.bdgenomics.adam.util.Util._ -import scala.collection.Map -import org.bdgenomics.adam.models.ReadBucket - -object DefaultComparisons { - - val comparisons: Seq[BucketComparisons[Any]] = Seq[BucketComparisons[Any]]( - OverMatched, - DupeMismatch, - MappedPosition, - MapQualityScores, - BaseQualityScores) - - private val map = - comparisons.foldLeft( - Map[String, BucketComparisons[Any]]())( - (a: Map[String, BucketComparisons[Any]], b: BucketComparisons[Any]) => a + ((b.name, b))) - - def findComparison(k: String): BucketComparisons[Any] = - map.getOrElse(k, throw new ArrayIndexOutOfBoundsException( - String.format("Could not find comparison %s", k))) -} - -object OverMatched extends BooleanComparisons with Serializable { - val name = "overmatched" - val description = "Checks that all buckets have exactly 0 or 1 records" - - def matches(records1: Iterable[AlignmentRecord], records2: Iterable[AlignmentRecord]): Boolean = - records1.size == records2.size && (records1.size == 0 || records1.size == 1) - - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[Boolean] = - Seq(matches(bucket1.unpairedPrimaryMappedReads, bucket2.unpairedPrimaryMappedReads) && - matches(bucket1.pairedFirstPrimaryMappedReads, bucket2.pairedFirstPrimaryMappedReads) && - matches(bucket1.pairedSecondPrimaryMappedReads, bucket2.pairedSecondPrimaryMappedReads) && - matches(bucket1.pairedFirstSecondaryMappedReads, bucket2.pairedFirstSecondaryMappedReads) && - matches(bucket1.pairedSecondSecondaryMappedReads, bucket2.pairedSecondSecondaryMappedReads)) - - def schemas: Seq[FieldValue] = Seq() -} - -object DupeMismatch extends PointComparisons with Serializable { - val name = "dupemismatch" - val description = "Counts the number of common reads marked as duplicates" - - def points(records1: Iterable[AlignmentRecord], records2: Iterable[AlignmentRecord]): Option[(Int, Int)] = { - if (records1.size == records2.size) { - records1.size match { - case 0 => None - case 1 => Some((if (records1.head.getDuplicateRead) 1 else 0, if (records2.head.getDuplicateRead) 1 else 0)) - case _ => None - } - } else None - } - - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[(Int, Int)] = - Seq( - points(bucket1.unpairedPrimaryMappedReads, bucket2.unpairedPrimaryMappedReads), - points(bucket1.pairedFirstPrimaryMappedReads, bucket2.pairedFirstPrimaryMappedReads), - points(bucket1.pairedSecondPrimaryMappedReads, bucket2.pairedSecondPrimaryMappedReads), - points(bucket1.pairedFirstSecondaryMappedReads, bucket2.pairedFirstSecondaryMappedReads), - points(bucket1.pairedSecondSecondaryMappedReads, bucket2.pairedSecondSecondaryMappedReads)).flatten - - def schemas: Seq[FieldValue] = Seq(duplicateRead) -} - -object MappedPosition extends LongComparisons with Serializable { - val name = "positions" - val description = "Counts how many reads align to the same genomic location" - - def distance(records1: Iterable[AlignmentRecord], records2: Iterable[AlignmentRecord]): Long = { - if (records1.size == records2.size) records1.size match { - case 0 => 0 - case 1 => - val r1 = records1.head - val r2 = records2.head - if (isSameContig(r1.getContig, r2.getContig)) { - val start1 = r1.getStart - val start2 = r2.getStart - if (start1 > start2) start1 - start2 else start2 - start1 - } else -1 - case _ => -1 - } - else -1 - } - - /** - * The records have been matched by their names, but the rest may be mismatched. - */ - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[Long] = - Seq(distance(bucket1.unpairedPrimaryMappedReads, bucket2.unpairedPrimaryMappedReads) + - distance(bucket1.pairedFirstPrimaryMappedReads, bucket2.pairedFirstPrimaryMappedReads) + - distance(bucket1.pairedSecondPrimaryMappedReads, bucket2.pairedSecondPrimaryMappedReads) + - distance(bucket1.pairedFirstSecondaryMappedReads, bucket2.pairedFirstSecondaryMappedReads) + - distance(bucket1.pairedSecondSecondaryMappedReads, bucket2.pairedSecondSecondaryMappedReads)) - - def schemas: Seq[FieldValue] = Seq( - start, - firstOfPair) -} - -object MapQualityScores extends PointComparisons with Serializable { - val name = "mapqs" - val description = "Creates scatter plot of mapping quality scores across identical reads" - - def points(records1: Iterable[AlignmentRecord], records2: Iterable[AlignmentRecord]): Option[(Int, Int)] = { - if (records1.size == records2.size) { - records1.size match { - case 0 => None - case 1 => Some((records1.head.getMapq.toInt, records2.head.getMapq.toInt)) - case _ => None - } - } else None - } - - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[(Int, Int)] = - Seq( - points(bucket1.unpairedPrimaryMappedReads, bucket2.unpairedPrimaryMappedReads), - points(bucket1.pairedFirstPrimaryMappedReads, bucket2.pairedFirstPrimaryMappedReads), - points(bucket1.pairedSecondPrimaryMappedReads, bucket2.pairedSecondPrimaryMappedReads), - points(bucket1.pairedFirstSecondaryMappedReads, bucket2.pairedFirstSecondaryMappedReads), - points(bucket1.pairedSecondSecondaryMappedReads, bucket2.pairedSecondSecondaryMappedReads)).flatten - - def schemas: Seq[FieldValue] = Seq(mapq) -} - -object BaseQualityScores extends PointComparisons with Serializable { - val name = "baseqs" - val description = "Creates scatter plots of base quality scores across identical positions in the same reads" - - def points(records1: Iterable[AlignmentRecord], records2: Iterable[AlignmentRecord]): Seq[(Int, Int)] = { - if (records1.size == records2.size) { - records1.size match { - case 0 => Seq() - case 1 => - val record1 = records1.head - val record2 = records2.head - record1.qualityScores - .zip(record2.qualityScores) - .map(b => (b._1.toInt, b._2.toInt)) - case _ => Seq() - } - } else Seq() - } - - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[(Int, Int)] = - points(bucket1.unpairedPrimaryMappedReads, bucket2.unpairedPrimaryMappedReads) ++ - points(bucket1.pairedFirstPrimaryMappedReads, bucket2.pairedFirstPrimaryMappedReads) ++ - points(bucket1.pairedSecondPrimaryMappedReads, bucket2.pairedSecondPrimaryMappedReads) ++ - points(bucket1.pairedFirstSecondaryMappedReads, bucket2.pairedFirstSecondaryMappedReads) ++ - points(bucket1.pairedSecondSecondaryMappedReads, bucket2.pairedSecondSecondaryMappedReads) - - def schemas: Seq[FieldValue] = Seq(qual) -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/Comparisons.scala b/adam-core/src/main/scala/org/bdgenomics/adam/metrics/Comparisons.scala deleted file mode 100644 index 273fc6136f..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/Comparisons.scala +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.metrics - -import java.util.regex.Pattern -import org.bdgenomics.adam.metrics.filters.ComparisonsFilter -import org.bdgenomics.adam.models.ReadBucket -import org.bdgenomics.adam.projections.FieldValue -import org.bdgenomics.utils.metrics.Collection - -trait BucketComparisons[+T] { - /** - * Name of the comparison. Should be identifiable, and able to be written on the command line. - */ - def name: String - - /** - * Description of the comparison, to be used by the "list comparisons" CLI option. - */ - def description: String - - /** - * All of the schemas which this comparison uses. - */ - def schemas: Seq[FieldValue] - - /** - * The records have been matched by their names, but the rest may be mismatched. - */ - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[T] - - /** - * parses and creates a filter for this BucketComparisons value, conforming to the definition in - * the filterDef argument. - * - * @param filterDef a filter definition string, e.g. ">3" - * @return A ComparisonsFilter value that performs the indicated comparison on the - * output of 'this' (i.e. T values) - */ - def createFilter(filterDef: String): ComparisonsFilter[Any] - -} - -object BucketComparisons { - - val pointRegex = Pattern.compile("\\(\\s*(-?\\d+)\\s*,\\s*(-?\\d+)\\s*\\)") - - def parsePoint(pointString: String): (Int, Int) = { - - val pointMatcher = pointRegex.matcher(pointString) - if (!pointMatcher.matches()) { - throw new IllegalArgumentException("\"%s\" doesn't match point regex".format(pointString)) - } - - val p1 = pointMatcher.group(1).toInt - val p2 = pointMatcher.group(2).toInt - - (p1, p2) - } - - case class ParsedFilter(filter: String, value: String) { - def valueAsInt: Int = value.toInt - def valueAsDouble: Double = value.toDouble - def valueAsPoint: (Int, Int) = parsePoint(value) - def valueAsBoolean: Boolean = value.toBoolean - def valueAsLong: Long = value.toLong - - def assertFilterValues(filterName: String, legalValues: String*) { - if (!legalValues.contains(filter)) { - throw new IllegalArgumentException("\"%s\" isn't a legal filter value for %s".format(value, filterName)) - } - } - } - - val filterStringRegex = Pattern.compile("(!=|=|<|>)(.*)") - - def parseFilterString(filterString: String): ParsedFilter = { - val matcher = filterStringRegex.matcher(filterString) - if (!matcher.matches()) { - throw new IllegalArgumentException("\"%s\" doesn't match filter string pattern".format(filterString)) - } - - ParsedFilter(matcher.group(1), matcher.group(2)) - } - - class EqualityFilter[T](generator: BucketComparisons[T], target: T) extends ComparisonsFilter[T](generator) { - def passesFilter(value: Any): Boolean = value == target - } - - class InequalityFilter[T](generator: BucketComparisons[T], target: T) extends ComparisonsFilter[T](generator) { - def passesFilter(value: Any): Boolean = value != target - } - - class WrapperFilter[T](generator: BucketComparisons[T], f: (Any) => Boolean) extends ComparisonsFilter[T](generator) { - def passesFilter(value: Any): Boolean = f(value) - } -} - -class CombinedComparisons[T](inner: Seq[BucketComparisons[T]]) extends BucketComparisons[Collection[Seq[T]]] with Serializable { - /** - * Name of the comparison. Should be identifiable, and able to be written on the command line. - */ - def name: String = "(%s)".format(inner.map(_.name).reduce("%s+%s".format(_, _))) - - /** - * Description of the comparison, to be used by the "list comparisons" CLI option. - */ - def description: String = "A list of comparisons" - - /** - * All of the schemas which this comparison uses. - */ - def schemas: Seq[FieldValue] = inner.flatMap(_.schemas) - - /** - * The records have been matched by their names, but the rest may be mismatched. - */ - def matchedByName(bucket1: ReadBucket, bucket2: ReadBucket): Seq[Collection[Seq[T]]] = - Seq(Collection(inner.map(bc => bc.matchedByName(bucket1, bucket2)))) - - /** - * parses and creates a filter for this BucketComparisons value, conforming to the definition in - * the filterDef argument. - * - * @param filterDef a filter definition string, e.g. ">3" - * @return A ComparisonsFilter value that performs the indicated comparison on the - * output of 'this' (i.e. T values) - */ - def createFilter(filterDef: String): ComparisonsFilter[Any] = { - throw new UnsupportedOperationException() - } -} - -abstract class BooleanComparisons extends BucketComparisons[Boolean] { - - /** - * parses and creates a filter for this BucketComparisons value, conforming to the definition in - * the filterDef argument. - * - * @param filterDef a filter definition string, e.g. ">3" - * @return A ComparisonsFilter value that performs the indicated comparison on the - * output of 'this' (i.e. T values) - */ - def createFilter(filterDef: String): ComparisonsFilter[Boolean] = { - import BucketComparisons._ - - val parsedFilter = parseFilterString(filterDef) - parsedFilter.assertFilterValues(name, "=") - new EqualityFilter(this, parsedFilter.valueAsBoolean) - } -} - -abstract class LongComparisons extends BucketComparisons[Long] { - - /** - * parses and creates a filter for this BucketComparisons value, conforming to the definition in - * the filterDef argument. - * - * @param filterDef a filter definition string, e.g. ">3" - * @return A ComparisonsFilter value that performs the indicated comparison on the - * output of 'this' (i.e. T values) - */ - def createFilter(filterDef: String): ComparisonsFilter[Long] = { - import BucketComparisons._ - - val parsedFilter = parseFilterString(filterDef) - parsedFilter.assertFilterValues(name, "=", ">", "<") - - parsedFilter.filter match { - case "=" => new EqualityFilter(this, parsedFilter.valueAsLong) - case ">" => new WrapperFilter(this, (value: Any) => value.asInstanceOf[Long] > parsedFilter.valueAsLong) - case "<" => new WrapperFilter(this, (value: Any) => value.asInstanceOf[Long] < parsedFilter.valueAsLong) - - case _ => throw new IllegalArgumentException( - "Unknown filter-type \"%s\" in filterDef \"%s\"".format(parsedFilter.filter, filterDef)) - } - } -} - -abstract class PointComparisons extends BucketComparisons[(Int, Int)] { - - /** - * parses and creates a filter for this BucketComparisons value, conforming to the definition in - * the filterDef argument. - * - * @param filterDef a filter definition string, e.g. ">3" - * @return A ComparisonsFilter value that performs the indicated comparison on the - * output of 'this' (i.e. T values) - */ - def createFilter(filterDef: String): ComparisonsFilter[(Int, Int)] = { - import BucketComparisons._ - - val parsedFilter = parseFilterString(filterDef) - parsedFilter.assertFilterValues(name, "=", "!=") - if (parsedFilter.filter == "=") new EqualityFilter(this, parsedFilter.valueAsPoint) - else new InequalityFilter(this, parsedFilter.valueAsPoint) - } -} - diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilter.scala b/adam-core/src/main/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilter.scala deleted file mode 100644 index f82b189558..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilter.scala +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.metrics.filters - -import org.bdgenomics.adam.metrics.{ CombinedComparisons, BucketComparisons } -import org.bdgenomics.utils.metrics.{ Collection => BDGCollection } - -/** - * Used by FindReads, a GeneratorFilter is a predicate on values, which also wraps a particular - * BucketComparisons object (the thing that produces those values). - * - * BucketComparisons will generate a Seq[T] for each ReadBucket it's given -- FindReads then filters - * the ReadBuckets based on whether any element of the Seq[T] passes the GeneratorFilter (this - * allows us to handle both read-level and base-level comparison metrics and filters, e.g. 'find all - * matched ReadBuckets for which some base quality score doesn't match'). - * - */ -trait GeneratorFilter[+T] extends Serializable { - def passesFilter(value: Any): Boolean - def comparison: BucketComparisons[T] -} - -/** - * A utility class, so we only have to extend the 'passesFilter' method in subclasses. - * - * @param comparison The BucketComparisons value to wrap. - * @tparam T The type of value produced by the 'generator' argument, and filtered by this class. - */ -abstract class ComparisonsFilter[+T](val comparison: BucketComparisons[T]) extends GeneratorFilter[T] {} - -/** - * CombinedFilter lifts a Sequence of GeneratorFilter[T] filters into a single GeneratorFilter (which filters - * a vector, here reified as a metrics.Collection value, of Seq[T]). - */ -class CombinedFilter[T](val filters: Seq[GeneratorFilter[T]]) extends GeneratorFilter[BDGCollection[Seq[T]]] { - - def passesFilter(value: Any): Boolean = { - value match { - case valueCollection: BDGCollection[_] => - filters.zip(valueCollection.values).forall { - case (f: GeneratorFilter[T], values: Seq[T]) => - values.exists(f.passesFilter(_)) - } - } - } - - def comparison: BucketComparisons[BDGCollection[Seq[T]]] = new CombinedComparisons[T](filters.map(_.comparison)) -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngine.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngine.scala deleted file mode 100644 index 13b9ca360a..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngine.scala +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.read.comparisons - -import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.metrics.BucketComparisons -import org.bdgenomics.adam.metrics.filters.GeneratorFilter -import org.bdgenomics.adam.models.ReadBucket -import org.bdgenomics.adam.projections.{ FieldValue, Projection } -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.formats.avro.AlignmentRecord -import org.bdgenomics.utils.metrics.aggregators.{ Aggregated, Aggregator } -import scala.reflect.ClassTag - -class ComparisonTraversalEngine(schema: Seq[FieldValue], input1: RDD[AlignmentRecord], input2: RDD[AlignmentRecord])(implicit sc: SparkContext) { - def this(schema: Seq[FieldValue], input1Paths: Seq[Path], input2Paths: Seq[Path])(implicit sc: SparkContext) = - this(schema, - sc.loadAlignmentsFromPaths(input1Paths), - sc.loadAlignmentsFromPaths(input2Paths))(sc) - - lazy val projection = Projection(schema: _*) - - lazy val named1 = input1.adamSingleReadBuckets() - .map(ReadBucket.singleReadBucketToReadBucket).keyBy(_.allReads().head.getReadName) - lazy val named2 = input2.adamSingleReadBuckets() - .map(ReadBucket.singleReadBucketToReadBucket).keyBy(_.allReads().head.getReadName) - - lazy val joined = named1.join(named2) - - def uniqueToNamed1(): Long = { - named1.leftOuterJoin(named2).filter { - case (name, (bucket1, Some(bucket2))) => false - case (name, (bucket1, None)) => true - }.count() - } - - def uniqueToNamed2(): Long = { - named2.leftOuterJoin(named1).filter { - case (name, (bucket1, Some(bucket2))) => false - case (name, (bucket1, None)) => true - }.count() - } - - def generate[T](generator: BucketComparisons[T]): RDD[(String, Seq[T])] = - ComparisonTraversalEngine.generate[T](joined, generator) - - def find[T](filter: GeneratorFilter[T]): RDD[String] = - ComparisonTraversalEngine.find[T](joined, filter) -} - -object ComparisonTraversalEngine { - - type JoinedType = RDD[(String, (ReadBucket, ReadBucket))] - type GeneratedType[T] = RDD[(String, Seq[T])] - - def generate[T](joined: JoinedType, generator: BucketComparisons[T]): GeneratedType[T] = - joined.map { - case (name, (bucket1, bucket2)) => - (name, generator.matchedByName(bucket1, bucket2)) - } - - def find[T](joined: JoinedType, filter: GeneratorFilter[T]): RDD[String] = - joined.filter { - case (name, (bucket1, bucket2)) => - filter.comparison.matchedByName(bucket1, bucket2).exists(filter.passesFilter) - }.map(_._1) - - def combine[T, A <: Aggregated[T]: ClassTag](generated: GeneratedType[T], aggregator: Aggregator[T, A]): A = - generated.aggregate[A](aggregator.initialValue)( - (aggregated, namedValue) => aggregator.combine(aggregated, aggregator.lift(namedValue._2)), - aggregator.combine) - -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ConcordanceTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ConcordanceTable.scala deleted file mode 100644 index cf169f45dd..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ConcordanceTable.scala +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.variation - -import java.util.EnumSet -import scala.collection.JavaConverters._ -import org.bdgenomics.formats.avro.GenotypeType - -object ConcordanceTable { - def apply() = new ConcordanceTable() - def apply(p: (GenotypeType, GenotypeType)) = (new ConcordanceTable()).add(p) - - // Relevant sub-groups of concordance table entries - val CALLED = EnumSet.of(GenotypeType.HOM_REF, GenotypeType.HET, GenotypeType.HOM_ALT) - val VARIANT = EnumSet.of(GenotypeType.HET, GenotypeType.HOM_ALT) - val ALL = EnumSet.allOf(classOf[GenotypeType]) - - implicit def typesToIdxs(types: EnumSet[GenotypeType]): Set[Int] = { - types.asScala.map(_.ordinal).toSet - } -} - -/** - * Helper class for maintaining the genotype concordance table and computing the relevant - * metrics. Table is indexed by genotype zygosity. Many of the metrics are based on the - * [[http://gatkforums.broadinstitute.org/discussion/48/using-varianteval GATK GenotypeConcordance Walker]] - * Table is organized as test vs. truth, i.e. rows correspond to "test" genotypes, columns - * to "truth" genotypes. - */ -class ConcordanceTable { - import ConcordanceTable._ - - private val table_ = Array.fill[Long](GenotypeType.values.length, GenotypeType.values.length)(0L) - - /** - * Add single genotype-genotype comparison into this table. - * @param p Tuple of (test, truth) GenotypeType - * @return this - */ - def add(p: (GenotypeType, GenotypeType)): ConcordanceTable = { - table_(p._1.ordinal)(p._2.ordinal) += 1L - this - } - - /** - * Add that ConcordanceTable into this table. - * @param that ConcordanceTable - * @return this - */ - def add(that: ConcordanceTable): ConcordanceTable = { - for (r <- ALL; c <- ALL) - table_(r)(c) += that.table_(r)(c) - this - } - - /** - * Get single table entry at (test, truth) - */ - def get(test: GenotypeType, truth: GenotypeType): Long = table_(test.ordinal)(truth.ordinal) - - def total(): Long = total(ALL, ALL) - def total(diagonal: EnumSet[GenotypeType]): Long = { - var t = 0L - for (i <- diagonal) - t += table_(i)(i) - t - } - - /** - * Total of all entries indexed by the cartesian product of test and truth - */ - def total(test: EnumSet[GenotypeType], truth: EnumSet[GenotypeType]): Long = { - var t = 0L - for (r <- test; c <- truth) - t += table_(r)(c) - t - } - - private def ratio(num: Long, dnm: Long) = if (dnm == 0) 0.0 else num.toDouble / dnm.toDouble - - /** - * Overally genotype concordance, or the percentage of identical genotypes (including homozygous reference calls) - */ - def concordance = ratio(total(CALLED), total(CALLED, CALLED)) - - /** - * Non-reference sensitivity or NRS is a site-level variant sensitivity metric. - */ - def nonReferenceSensitivity = ratio(total(VARIANT, VARIANT), total(ALL, VARIANT)) - - /** - * Non-reference concordance or NRC is similar to NRS, but requires strict zygosity matching - * in the numerator. - */ - def nonReferenceConcordance = ratio(total(VARIANT), total(ALL, VARIANT)) - - /** - * Alias for nonReferenceConcordance - */ - def recall = nonReferenceConcordance - - /** - * Non-reference discrepancy is a measure of discrepant calls, excluding matching - * homozygous reference genotypes, which are easier to call. - */ - def nonReferenceDiscrepancy = { - val all_called = total(ALL, ALL) - ratio(all_called - total(ALL), all_called - get(GenotypeType.HOM_REF, GenotypeType.HOM_REF)) - } - - /** - * Precision metric. This metric is similar to NRC but with "test" and "truth" reversed. - */ - def precision = ratio(total(VARIANT), total(VARIANT, ALL)) -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummary.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummary.scala deleted file mode 100644 index e69f338f03..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummary.scala +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.variation - -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.rdd.variation.GenotypesSummary.StatisticsMap -import org.bdgenomics.adam.rdd.variation.GenotypesSummaryCounts.ReferenceAndAlternate -import org.bdgenomics.adam.rich.RichVariant._ -import org.bdgenomics.formats.avro.{ Genotype, GenotypeAllele } -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.immutable.Map - -/** - * Simple counts of various properties across a set of genotypes. - * - * Note: for counts of variants, both homozygous and heterozygous - * count as 1 (i.e. homozygous alternate is NOT counted as 2). - * This seems to be the most common convention. - * - * @param genotypesCounts Counts of genotypes: map from list of GenotypeAllele (of size ploidy) -> count - * @param singleNucleotideVariantCounts Map from ReferenceAndAlternate -> count - * where ReferenceAndAlternate is a single base variant. - * @param multipleNucleotideVariantCount Count of multiple nucleotide variants (e.g.: AA -> TG) - * @param insertionCount Count of insertions - * @param deletionCount Count of deletions - * @param readCount Sum of read depths for all genotypes with a called variant - * @param phasedCount Number of genotypes with phasing information - * - */ - -case class GenotypesSummaryCounts( - genotypesCounts: GenotypesSummaryCounts.GenotypeAlleleCounts, - singleNucleotideVariantCounts: GenotypesSummaryCounts.VariantCounts, - multipleNucleotideVariantCount: Long, - insertionCount: Long, - deletionCount: Long, - readCount: Option[Long], - phasedCount: Long) { - - lazy val genotypesCount: Long = genotypesCounts.values.sum - lazy val variantGenotypesCount: Long = - genotypesCounts.keys.filter(_.contains(GenotypeAllele.Alt)).map(genotypesCounts(_)).sum - lazy val singleNucleotideVariantCount: Long = singleNucleotideVariantCounts.values.sum - lazy val transitionCount: Long = GenotypesSummaryCounts.transitions.map(singleNucleotideVariantCounts).sum - lazy val transversionCount: Long = GenotypesSummaryCounts.transversions.map(singleNucleotideVariantCounts).sum - lazy val noCallCount: Long = genotypesCounts.count(_._1.contains(GenotypeAllele.NoCall)) - lazy val averageReadDepthAtVariants = - if (variantGenotypesCount == 0) None - else for (readCount1 <- readCount) yield readCount1.toDouble / variantGenotypesCount.toDouble - lazy val withDefaultZeroCounts = GenotypesSummaryCounts( - genotypesCounts.withDefaultValue(0.toLong), - singleNucleotideVariantCounts.withDefaultValue(0.toLong), - multipleNucleotideVariantCount, - insertionCount, - deletionCount, - readCount, - phasedCount) - - def combine(that: GenotypesSummaryCounts): GenotypesSummaryCounts = { - def combine_counts[A](map1: Map[A, Long], map2: Map[A, Long]): Map[A, Long] = { - val keys: Set[A] = map1.keySet.union(map2.keySet) - val pairs = keys.map(k => (k -> (map1.getOrElse(k, 0.toLong) + map2.getOrElse(k, 0.toLong)))) - pairs.toMap - } - GenotypesSummaryCounts( - combine_counts(genotypesCounts, that.genotypesCounts), - combine_counts(singleNucleotideVariantCounts, that.singleNucleotideVariantCounts), - multipleNucleotideVariantCount + that.multipleNucleotideVariantCount, - insertionCount + that.insertionCount, - deletionCount + that.deletionCount, - for (readCount1 <- readCount; readcount2 <- that.readCount) yield readCount1 + readcount2, - phasedCount + that.phasedCount) - } -} -object GenotypesSummaryCounts { - case class ReferenceAndAlternate(reference: String, alternate: String) - - type GenotypeAlleleCounts = Map[List[GenotypeAllele], Long] - type VariantCounts = Map[ReferenceAndAlternate, Long] - - val simpleNucleotides = List("A", "C", "T", "G") - - val transitions = List( - ReferenceAndAlternate("A", "G"), - ReferenceAndAlternate("G", "A"), - ReferenceAndAlternate("C", "T"), - ReferenceAndAlternate("T", "C")) - - val transversions = List( - ReferenceAndAlternate("A", "C"), - ReferenceAndAlternate("C", "A"), - ReferenceAndAlternate("A", "T"), - ReferenceAndAlternate("T", "A"), - ReferenceAndAlternate("G", "C"), - ReferenceAndAlternate("C", "G"), - ReferenceAndAlternate("G", "T"), - ReferenceAndAlternate("T", "G")) - - /** - * Factory for an empty GenotypesSummaryCounts. - */ - def apply(): GenotypesSummaryCounts = - GenotypesSummaryCounts( - Map(), - Map(), - 0, // Multiple nucleotide variants - 0, // Insertion count - 0, // Deletion count - Some(0), // Read count - 0) // Phased count - - def apply(counts: GenotypesSummaryCounts) { - assert(false) - } - - /** - * Factory for a GenotypesSummaryCounts that counts a single Genotype. - */ - def apply(genotype: Genotype): GenotypesSummaryCounts = { - val variant = genotype.getVariant - val ref_and_alt = ReferenceAndAlternate(variant.getReferenceAllele.toString, - variant.getAlternateAllele.toString) - - // We always count our genotype. The other counts are set to 1 only if we have a variant genotype. - val isVariant = genotype.getAlleles.contains(GenotypeAllele.Alt) - val genotypeAlleleCounts = Map(genotype.getAlleles.asScala.toList -> 1.toLong) - val variantCounts = ( - if (isVariant && variant.isSingleNucleotideVariant) Map(ref_and_alt -> 1.toLong) - else Map(): VariantCounts) - - val readDepth = ( - if (genotype.getReadDepth == null) None - else if (isVariant) Some(genotype.getReadDepth.toLong) - else Some(0.toLong)) - - GenotypesSummaryCounts( - genotypeAlleleCounts, - variantCounts, - if (isVariant && variant.isMultipleNucleotideVariant) 1 else 0, - if (isVariant && variant.isInsertion) 1 else 0, - if (isVariant && variant.isDeletion) 1 else 0, - readDepth, - if (isVariant && genotype.getIsPhased != null && genotype.getIsPhased) 1 else 0) - } -} - -/** - * Summary statistics for a set of genotypes. - * @param perSampleStatistics A map from sample id -> GenotypesSummaryCounts for that sample - * @param singletonCount Number of variants that are called in exactly one sample. - * @param distinctVariantCount Number of distinct variants that are called at least once. - * - */ -case class GenotypesSummary( - perSampleStatistics: StatisticsMap, - singletonCount: Long, - distinctVariantCount: Long) { - lazy val aggregateStatistics = - perSampleStatistics.values.foldLeft(GenotypesSummaryCounts())(_.combine(_)).withDefaultZeroCounts -} -object GenotypesSummary { - type StatisticsMap = Map[String, GenotypesSummaryCounts] - - /** - * Factory for a GenotypesSummary given an RDD of Genotype. - */ - def apply(rdd: RDD[Genotype]): GenotypesSummary = { - def combineStatisticsMap(stats1: StatisticsMap, stats2: StatisticsMap): StatisticsMap = { - stats1.keySet.union(stats2.keySet).map(sample => { - (stats1.get(sample), stats2.get(sample)) match { - case (Some(statsA), Some(statsB)) => sample -> statsA.combine(statsB) - case (Some(stats), None) => sample -> stats - case (None, Some(stats)) => sample -> stats - case (None, None) => throw new AssertionError("Unreachable") - } - }).toMap - } - val perSampleStatistics: StatisticsMap = rdd - .map(genotype => Map(genotype.getSampleId.toString -> GenotypesSummaryCounts(genotype))) - .fold(Map(): StatisticsMap)(combineStatisticsMap(_, _)) - .map({ case (sample: String, stats: GenotypesSummaryCounts) => sample -> stats.withDefaultZeroCounts }).toMap - val variantCounts = - rdd.filter(_.getAlleles.contains(GenotypeAllele.Alt)).map(genotype => { - val variant = genotype.getVariant - (variant.getContig, variant.getStart, variant.getReferenceAllele, variant.getAlternateAllele) - }).countByValue - val singletonCount = variantCounts.count(_._2 == 1) - val distinctVariantsCount = variantCounts.size - GenotypesSummary(perSampleStatistics, singletonCount, distinctVariantsCount) - } -} -/** - * Functions for converting a GenotypesSummary object to various text formats. - */ -object GenotypesSummaryFormatting { - def format_csv(summary: GenotypesSummary): String = { - - val genotypeAlleles = sortedGenotypeAlleles(summary.aggregateStatistics) - - def format_statistics(stats: GenotypesSummaryCounts): Seq[String] = { - val row = mutable.MutableList[String]() - row += stats.genotypesCount.toString - row += stats.variantGenotypesCount.toString - row += stats.insertionCount.toString - row += stats.deletionCount.toString - row += stats.singleNucleotideVariantCount.toString - row += stats.transitionCount.toString - row += stats.transversionCount.toString - row += (stats.transitionCount.toDouble / stats.transversionCount.toDouble).toString - row ++= genotypeAlleles.map(stats.genotypesCounts(_).toString) // Genotype counts - row ++= allSNVs.map(stats.singleNucleotideVariantCounts(_).toString) // SNV counts - row - } - - val basicHeader = List( - "Sample", "Genotypes", "Variant Genotypes", "Insertions", "Deletions", "SNVs", "Transitions", "Transversions", "Ti / Tv") - val genotypesHeader = genotypeAlleles.map(genotypeAllelesToString(_)) - val snvsHeader = allSNVs.map(snv => "%s>%s".format(snv.reference, snv.alternate)) - - val result = new mutable.StringBuilder - result ++= "# " + (basicHeader ++ genotypesHeader ++ snvsHeader).mkString(", ") + "\n" - - for ((sample, stats) <- summary.perSampleStatistics) { - val row = mutable.MutableList(sample) - row ++= format_statistics(stats) - result ++= row.mkString(", ") + "\n" - } - val final_row = List("Aggregated") ++ format_statistics(summary.aggregateStatistics) - result ++= final_row.mkString(", ") + "\n" - result.toString - } - - def format_human_readable(summary: GenotypesSummary): String = { - def format_statistics(stats: GenotypesSummaryCounts, result: mutable.StringBuilder) = { - result ++= "\tVariant Genotypes: %d / %d = %1.3f%%\n".format( - stats.variantGenotypesCount, - stats.genotypesCount, - stats.variantGenotypesCount.toDouble * 100.0 / stats.genotypesCount) - - for (genotype <- sortedGenotypeAlleles(summary.aggregateStatistics)) { - val count = stats.genotypesCounts(genotype) - result ++= "\t%20s: %9d = %1.3f%%\n".format( - genotypeAllelesToString(genotype), - count, - count.toDouble * 100.0 / stats.genotypesCount.toDouble) - } - result ++= "\tInsertions: %d\n".format(stats.insertionCount) - result ++= "\tDeletions: %d\n".format(stats.deletionCount) - result ++= "\tMultiple nucleotide variants: %d\n".format(stats.multipleNucleotideVariantCount) - result ++= "\tSingle nucleotide variants: %d\n".format(stats.singleNucleotideVariantCount) - result ++= "\t\tTransitions / transversions: %4d / %4d = %1.3f\n".format( - stats.transitionCount, - stats.transversionCount, - stats.transitionCount.toDouble / stats.transversionCount.toDouble) - var from, to = 0 - for (snv <- allSNVs) { - result ++= "\t\t%s>%s %9d\n".format(snv.reference, snv.alternate, stats.singleNucleotideVariantCounts(snv)) - } - result ++= "\tAverage read depth at called variants: %s\n".format(stats.averageReadDepthAtVariants match { - case Some(depth) => "%1.1f".format(depth) - case None => "[no variant calls, or read depth missing for one or more variant calls]" - }) - result ++= "\tPhased genotypes: %d / %d = %1.3f%%\n".format( - stats.phasedCount, - stats.genotypesCount, - stats.phasedCount.toDouble * 100 / stats.genotypesCount) - } - - val result = new mutable.StringBuilder - for (sample <- summary.perSampleStatistics.keySet.toList.sorted) { - result ++= "Sample: %s\n".format(sample) - format_statistics(summary.perSampleStatistics(sample), result) - result ++= "\n" - } - result ++= "\nSummary\n" - result ++= "\tSamples: %d\n".format(summary.perSampleStatistics.size) - result ++= "\tDistinct variants: %d\n".format(summary.distinctVariantCount) - result ++= "\tVariants found only in a single sample: %d = %1.3f%%\n".format( - summary.singletonCount, - summary.singletonCount.toDouble * 100.0 / summary.distinctVariantCount) - format_statistics(summary.aggregateStatistics, result) - result.toString - } - - private def sortedGenotypeAlleles(stats: GenotypesSummaryCounts): Seq[List[GenotypeAllele]] = { - def genotypeSortOrder(genotype: List[GenotypeAllele]): Int = genotype.map({ - case GenotypeAllele.Ref => 0 - case GenotypeAllele.Alt | GenotypeAllele.OtherAlt => 1 // alt/otheralt sort to same point - case GenotypeAllele.NoCall => 10 // arbitrary large number so any genotype with a NoCall sorts last. - }).sum - stats.genotypesCounts.keySet.toList.sortBy(genotypeSortOrder(_)) - } - - private def genotypeAllelesToString(alleles: List[GenotypeAllele]): String = - alleles.map(_.toString).mkString("-") - - lazy val allSNVs: Seq[ReferenceAndAlternate] = - for ( - from <- GenotypesSummaryCounts.simpleNucleotides; - to <- GenotypesSummaryCounts.simpleNucleotides; - if (from != to) - ) yield GenotypesSummaryCounts.ReferenceAndAlternate(from, to) - -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariationRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariationRDDFunctions.scala index 2b5af98d00..b2b5f7dfad 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariationRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/VariationRDDFunctions.scala @@ -121,39 +121,6 @@ class GenotypeRDDFunctions(rdd: RDD[Genotype]) extends Serializable with Logging .map { case (v: RichVariant, g) => new VariantContext(v, g, None) } } - /** - * Compute the per-sample ConcordanceTable for this genotypes vs. the supplied - * truth dataset. Only genotypes with ploidy <= 2 will be considered. - * @param truth Truth genotypes - * @return PairedRDD of sample -> ConcordanceTable - */ - def concordanceWith(truth: RDD[Genotype]): RDD[(String, ConcordanceTable)] = { - // Concordance approach only works for ploidy <= 2, e.g. diploid/haploid - val keyedTest = rdd.filter(_.ploidy <= 2) - .keyBy(g => (g.getVariant, g.getSampleId.toString): (RichVariant, String)) - val keyedTruth = truth.filter(_.ploidy <= 2) - .keyBy(g => (g.getVariant, g.getSampleId.toString): (RichVariant, String)) - - val inTest = keyedTest.leftOuterJoin(keyedTruth) - val justInTruth = keyedTruth.subtractByKey(inTest) - - // Compute RDD[sample -> ConcordanceTable] across variants/samples - val inTestPairs = inTest.map({ - case ((_, sample), (l, Some(r))) => sample -> (l.getType, r.getType) - case ((_, sample), (l, None)) => sample -> (l.getType, GenotypeType.NO_CALL) - }) - val justInTruthPairs = justInTruth.map({ // "truth-only" entries - case ((_, sample), r) => sample -> (GenotypeType.NO_CALL, r.getType) - }) - - val bySample = inTestPairs.union(justInTruthPairs).combineByKey( - (p: (GenotypeType, GenotypeType)) => ConcordanceTable(p), - (l: ConcordanceTable, r: (GenotypeType, GenotypeType)) => l.add(r), - (l: ConcordanceTable, r: ConcordanceTable) => l.add(r)) - - bySample - } - def filterByOverlappingRegion(query: ReferenceRegion): RDD[Genotype] = { def overlapsQuery(rec: Genotype): Boolean = rec.getVariant.getContig.getContigName.toString == query.referenceName && diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/metrics/ComparisonsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/metrics/ComparisonsSuite.scala deleted file mode 100644 index 309dbc5ad9..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/metrics/ComparisonsSuite.scala +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.metrics - -import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } -import org.bdgenomics.adam.models.SingleReadBucket -import org.bdgenomics.utils.metrics._ - -class ComparisonsSuite extends ADAMFunSuite { - var bucket: SingleReadBucket = null - var bucketMapq: SingleReadBucket = null - var bucketMapqUnset: SingleReadBucket = null - var bucketDuplicate: SingleReadBucket = null - var bucketQual: SingleReadBucket = null - var bucketQualUnset: SingleReadBucket = null - var bucketMovedChromosome: SingleReadBucket = null - var bucketMovedStart: SingleReadBucket = null - - sparkBefore("Generators setup") { - def srb(record: AlignmentRecord): SingleReadBucket = { - val seq = Seq(record) - val rdd = sc.makeRDD(seq) - val srbRDD = SingleReadBucket(rdd) - srbRDD.first() - } - - val contig: Contig = Contig.newBuilder - .setContigName("chr1") - .build - val contig2: Contig = Contig.newBuilder - .setContigName("chr2") - .build - val record: AlignmentRecord = AlignmentRecord.newBuilder() - .setContig(contig) - .setReadName("test") - .setDuplicateRead(false) - .setMapq(10) - .setQual("abcdef") - .setStart(100) - .setPrimaryAlignment(true) - .setRecordGroupName("groupid") - .setReadMapped(true) - .build() - - bucket = srb(record) - - bucketMapq = srb(AlignmentRecord.newBuilder(record) - .setMapq(11) - .build()) - - bucketMapqUnset = srb(AlignmentRecord.newBuilder() - .setContig(contig) - .setReadName("test") - .setDuplicateRead(false) - .setQual("abcdef") - .setStart(100) - .setPrimaryAlignment(true) - .setRecordGroupName("groupid") - .setReadMapped(true) - .build()) - - bucketDuplicate = srb(AlignmentRecord.newBuilder(record) - .setDuplicateRead(true) - .build()) - - bucketQual = srb(AlignmentRecord.newBuilder(record) - .setQual("fedcba") - .build()) - - bucketQualUnset = srb(AlignmentRecord.newBuilder() - .setContig(contig) - .setReadName("test") - .setDuplicateRead(false) - .setMapq(10) - .setStart(100) - .setPrimaryAlignment(true) - .setRecordGroupName("groupid") - .setReadMapped(true) - .build()) - - bucketMovedChromosome = srb(AlignmentRecord.newBuilder(record) - .setContig(contig2) - .setStart(200) - .build()) - - bucketMovedStart = srb(AlignmentRecord.newBuilder(record) - .setStart(200) - .build()) - } - - sparkTest("Dupe mismatches found") { - assert(DupeMismatch.matchedByName(bucket, bucket) === Seq((0, 0))) - assert(DupeMismatch.matchedByName(bucket, bucketDuplicate) === Seq((0, 1))) - } - - sparkTest("Mismatched mapped positions histogram generated") { - - assert(Histogram(MappedPosition.matchedByName(bucket, bucket)).valueToCount(0) === 1) - assert(Histogram(MappedPosition.matchedByName(bucket, bucketMovedChromosome)).valueToCount.get(0).isEmpty) - assert(Histogram(MappedPosition.matchedByName(bucket, bucketMovedChromosome)).valueToCount(-1) === 1) - assert(Histogram(MappedPosition.matchedByName(bucket, bucketMovedStart)).valueToCount.get(0).isEmpty) - assert(Histogram(MappedPosition.matchedByName(bucket, bucketMovedStart)).valueToCount(100) === 1) - } - - sparkTest("Test map quality scores") { - assert(MapQualityScores.matchedByName(bucket, bucket).contains((10, 10))) - assert({ - val points = MapQualityScores.matchedByName(bucket, bucketMapq) - points.contains((10, 11)) || points.contains((11, 10)) - }) - } - - sparkTest("Test base quality scores") { - assert(BaseQualityScores.matchedByName(bucket, bucket).forall(a => a._1 == a._2)) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilterSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilterSuite.scala deleted file mode 100644 index b07afcef73..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/metrics/filters/GeneratorFilterSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.metrics.filters - -import org.scalatest._ - -import org.bdgenomics.utils.metrics.{ Collection => BDGCollection } - -class GeneratorFilterSuite extends FunSuite { - - test("CombinedFilter combines two independent filters") { - val f1 = new ComparisonsFilter[Int](null) { - def passesFilter(value: Any): Boolean = value.asInstanceOf[Int] > 5 - } - val f2 = new ComparisonsFilter[Int](null) { - def passesFilter(value: Any): Boolean = value.asInstanceOf[Int] > 10 - } - - val pass1 = 10 - val fail1 = 5 - val pass2 = 20 - - assert(f1.passesFilter(pass1)) - assert(!f1.passesFilter(fail1)) - assert(f2.passesFilter(pass2)) - - val pass = BDGCollection(Seq(Seq(pass1), Seq(pass2))) - val fail = BDGCollection(Seq(Seq(fail1), Seq(pass2))) - - val f12 = new CombinedFilter[Int](Seq(f1, f2)) - - assert(f12.passesFilter(pass)) - assert(!f12.passesFilter(fail)) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngineSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngineSuite.scala deleted file mode 100644 index 6f4d4f3298..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/comparisons/ComparisonTraversalEngineSuite.scala +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.read.comparisons - -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.metrics.MappedPosition -import org.bdgenomics.adam.projections.AlignmentRecordField -import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } -import org.bdgenomics.utils.metrics.Histogram -import org.bdgenomics.utils.metrics.aggregators.HistogramAggregator - -class ComparisonTraversalEngineSuite extends ADAMFunSuite { - - sparkTest("generate works on a simple RDD") { - val c0 = Contig.newBuilder - .setContigName("chr0") - .build - - val a0 = AlignmentRecord.newBuilder() - .setContig(c0) - .setRecordGroupName("group0") - .setReadName("read0") - .setStart(100) - .setPrimaryAlignment(true) - .setReadPaired(false) - .setReadMapped(true) - .build() - val a1 = AlignmentRecord.newBuilder(a0) - .setReadName("read1") - .setStart(200) - .build() - - val b0 = AlignmentRecord.newBuilder(a0) - .setStart(105) - .build() - val b1 = AlignmentRecord.newBuilder(a1).build() - - val a = sc.parallelize(Seq(a0, a1)) - val b = sc.parallelize(Seq(b0, b1)) - - import AlignmentRecordField._ - val fields = Seq(recordGroupId, readName, contig, start, primaryAlignment, readPaired, readMapped) - - val engine = new ComparisonTraversalEngine(fields, a, b)(sc) - - val generator = MappedPosition - - val generated: RDD[(String, Seq[Long])] = engine.generate(generator) - - val genMap = Map(generated.collect().map { case (key, value) => (key.toString, value) }: _*) - - assert(genMap.size === 2) - assert(genMap.contains("read0")) - assert(genMap.contains("read1")) - assert(genMap("read0") === Seq(5)) - assert(genMap("read1") === Seq(0)) - } - - sparkTest("combine works on a simple RDD") { - val c0 = Contig.newBuilder - .setContigName("chr0") - .build - - val a0 = AlignmentRecord.newBuilder() - .setRecordGroupName("group0") - .setReadName("read0") - .setContig(c0) - .setStart(100) - .setPrimaryAlignment(true) - .setReadPaired(false) - .setReadMapped(true) - .build() - val a1 = AlignmentRecord.newBuilder(a0) - .setReadName("read1") - .setStart(200) - .build() - val a2 = AlignmentRecord.newBuilder(a0) - .setReadName("read2") - .setStart(300) - .build() - - val b0 = AlignmentRecord.newBuilder(a0).build() - val b1 = AlignmentRecord.newBuilder(a1).build() - val b2 = AlignmentRecord.newBuilder(a2).setStart(305).build() - - val a = sc.parallelize(Seq(a0, a1, a2)) - val b = sc.parallelize(Seq(b0, b1, b2)) - - import AlignmentRecordField._ - val fields = Seq(recordGroupId, readName, contig, start, primaryAlignment, readPaired, readMapped) - - val engine = new ComparisonTraversalEngine(fields, a, b)(sc) - - val generator = MappedPosition - val aggregator = new HistogramAggregator[Long]() - - val aggregated: Histogram[Long] = - ComparisonTraversalEngine.combine(engine.generate(generator), aggregator) - - assert(aggregated.count() === 3) - assert(aggregated.countIdentical() === 2) - - assert(aggregated.valueToCount(0L) === 2) - assert(aggregated.valueToCount(5L) === 1) - } - -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypeRDDFunctionsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypeRDDFunctionsSuite.scala deleted file mode 100644 index 5a76574b56..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypeRDDFunctionsSuite.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.variation - -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro._ -import scala.collection.JavaConversions._ - -class GenotypeRDDFunctionsSuite extends ADAMFunSuite { - def v0 = Variant.newBuilder - .setContig(Contig.newBuilder.setContigName("11").build) - .setStart(17409572) - .setReferenceAllele("T") - .setAlternateAllele("C") - .build - - sparkTest("concordance of identical and non-identical genotypes") { - val gb = Genotype.newBuilder().setVariant(v0) - .setSampleId("NA12878") - .setAlleles(List(GenotypeAllele.Ref, GenotypeAllele.Alt)) - - val g0 = gb.build - val g1 = gb.build - val tables0 = sc.parallelize(Seq(g0)).concordanceWith(sc.parallelize(Seq(g1))).collectAsMap - assert(tables0.size === 1) - val table0 = tables0.getOrElse("NA12878", ConcordanceTable()) - assert(table0.total === 1) - assert(table0.get(GenotypeType.HET, GenotypeType.HET) === 1) - - val g2 = gb.setAlleles(List(GenotypeAllele.Ref, GenotypeAllele.Ref)).build - val table1 = sc.parallelize(Seq(g0)) - .concordanceWith(sc.parallelize(Seq(g2))).collectAsMap() - .getOrElse("NA12878", ConcordanceTable()) - assert(table1.total === 1) - assert(table1.get(GenotypeType.HET, GenotypeType.HOM_REF) === 1) - } - - sparkTest("concordance of identical VCFs should be 1.0") { - val path = ClassLoader.getSystemClassLoader.getResource("small.vcf").getFile - - val gts: RDD[Genotype] = sc.loadGenotypes(path) - assert(gts.filter(_.getSampleId == "NA12878").count === 5) - - val tables = gts.concordanceWith(gts).collectAsMap - assert(tables.size === 3L) - - val table0 = tables.getOrElse("NA12878", ConcordanceTable()) - assert(table0.total === 5L) - assert(table0.concordance === 1.0) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummarySuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummarySuite.scala deleted file mode 100644 index 9fa52dd8f6..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variation/GenotypesSummarySuite.scala +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.variation - -import org.bdgenomics.formats.avro._ -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.variation.GenotypesSummaryCounts.ReferenceAndAlternate -import org.bdgenomics.adam.util.ADAMFunSuite - -class GenotypesSummarySuite extends ADAMFunSuite { - - private def homRef = List(GenotypeAllele.Ref, GenotypeAllele.Ref) - private def het = List(GenotypeAllele.Alt, GenotypeAllele.Ref) - private def homAlt = List(GenotypeAllele.Alt, GenotypeAllele.Alt) - private def noCall = List(GenotypeAllele.NoCall, GenotypeAllele.NoCall) - - private def variant(reference: String, alternate: String, position: Int): Variant = { - Variant.newBuilder() - .setContig(Contig.newBuilder.setContigName("chr1").build) - .setStart(position) - .setReferenceAllele(reference) - .setAlternateAllele(alternate) - .build - } - - private def genotype(sample: String, variant: Variant, alleles: List[GenotypeAllele]) = { - Genotype.newBuilder() - .setSampleId(sample) - .setVariant(variant) - .setAlleles(alleles) - } - - private def summarize(genotypes: Seq[Genotype]): GenotypesSummary = { - val rdd = sc.parallelize(genotypes) - GenotypesSummary(rdd) - } - - sparkTest("simple genotypes summary") { - val genotypes = List( - genotype("alice", variant("A", "TT", 2), het).build, - genotype("alice", variant("G", "A", 4), het).build, - genotype("alice", variant("G", "C", 1), homRef).build, - genotype("alice", variant("G", "T", 0), homAlt).build, - genotype("alice", variant("GGG", "T", 7), het).build, - genotype("alice", variant("T", "AA", 9), het).build, - genotype("alice", variant("TT", "AA", 12), het).build, - genotype("bob", variant("A", "TT", 2), het).build, - genotype("bob", variant("A", "T", 3), het).build, - genotype("bob", variant("A", "T", 9), het).build, - genotype("bob", variant("T", "C", 4), het).setIsPhased(true).build, - genotype("bob", variant("T", "A", 7), homRef).build, - genotype("bob", variant("T", "G", 8), homAlt).build, - genotype("bob", variant("T", "G", 12), noCall).build, - genotype("empty", variant("T", "G", 12), noCall).build) - - val stats = summarize(genotypes) - assert(stats.perSampleStatistics.size == 3) - - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCount == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "G")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("C", "T")) == 0) - assert(stats.perSampleStatistics("empty").insertionCount == 0) - assert(stats.perSampleStatistics("empty").noCallCount == 1) - assert(stats.perSampleStatistics("empty").phasedCount == 0) - - assert(stats.perSampleStatistics("alice").singleNucleotideVariantCount == 2) - assert(stats.perSampleStatistics("alice").singleNucleotideVariantCounts(ReferenceAndAlternate("G", "A")) == 1) - assert(stats.perSampleStatistics("alice").singleNucleotideVariantCounts(ReferenceAndAlternate("G", "C")) == 0) - assert(stats.perSampleStatistics("alice").singleNucleotideVariantCounts(ReferenceAndAlternate("G", "T")) == 1) - assert(stats.perSampleStatistics("alice").singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 0) - assert(stats.perSampleStatistics("alice").deletionCount == 1) - assert(stats.perSampleStatistics("alice").insertionCount == 2) - assert(stats.perSampleStatistics("alice").multipleNucleotideVariantCount == 1) - assert(stats.perSampleStatistics("alice").phasedCount == 0) - assert(stats.perSampleStatistics("alice").noCallCount == 0) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCount == 4) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 2) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 1) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 1) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "G")) == 1) - assert(stats.perSampleStatistics("bob").singleNucleotideVariantCounts(ReferenceAndAlternate("C", "T")) == 0) - assert(stats.perSampleStatistics("bob").insertionCount == 1) - assert(stats.perSampleStatistics("bob").noCallCount == 1) - assert(stats.perSampleStatistics("bob").phasedCount == 1) - - assert(stats.aggregateStatistics.singleNucleotideVariantCount == 6) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "A")) == 1) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "C")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "T")) == 1) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 2) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 1) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("T", "G")) == 1) - assert(stats.aggregateStatistics.insertionCount == 3) - assert(stats.aggregateStatistics.deletionCount == 1) - assert(stats.aggregateStatistics.multipleNucleotideVariantCount == 1) - - assert(stats.singletonCount == 9) - assert(stats.distinctVariantCount == 10) - - // Test that the formatting functions do not throw. - GenotypesSummaryFormatting.format_csv(stats) - GenotypesSummaryFormatting.format_human_readable(stats) - } - - sparkTest("empty genotypes summary") { - val genotypes = List( - genotype("empty", variant("T", "G", 12), noCall).build) - - val stats = summarize(genotypes) - - assert(stats.perSampleStatistics.size == 1) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCount == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("T", "G")) == 0) - assert(stats.perSampleStatistics("empty").singleNucleotideVariantCounts(ReferenceAndAlternate("C", "T")) == 0) - assert(stats.perSampleStatistics("empty").insertionCount == 0) - assert(stats.perSampleStatistics("empty").noCallCount == 1) - assert(stats.perSampleStatistics("empty").phasedCount == 0) - - assert(stats.aggregateStatistics.singleNucleotideVariantCount == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "A")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "C")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("G", "T")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("A", "T")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("T", "C")) == 0) - assert(stats.aggregateStatistics.singleNucleotideVariantCounts(ReferenceAndAlternate("T", "G")) == 0) - assert(stats.aggregateStatistics.insertionCount == 0) - assert(stats.aggregateStatistics.deletionCount == 0) - assert(stats.aggregateStatistics.multipleNucleotideVariantCount == 0) - - assert(stats.singletonCount == 0) - assert(stats.distinctVariantCount == 0) - - // Test that the formatting functions do not throw. - GenotypesSummaryFormatting.format_csv(stats) - GenotypesSummaryFormatting.format_human_readable(stats) - } -}