Skip to content

Commit

Permalink
Merge b32a60b into b8e36b2
Browse files Browse the repository at this point in the history
  • Loading branch information
jpdna committed Apr 2, 2016
2 parents b8e36b2 + b32a60b commit b350f9c
Show file tree
Hide file tree
Showing 40 changed files with 132 additions and 191 deletions.
1 change: 1 addition & 0 deletions adam-apis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
<dependency>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>bdg-formats</artifactId>
<version>0.7.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.bdgenomics.adam</groupId>
Expand Down
1 change: 1 addition & 0 deletions adam-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
<dependency>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>bdg-formats</artifactId>
<version>0.7.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.bdgenomics.adam</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ class Fragments2Reads(protected val args: Fragments2ReadsArgs) extends BDGSparkC
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.bdgenomics.adam.models.{
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.MDTagging
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, MDTagging }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -267,7 +267,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
)
}

val aRdd =
val aRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
Expand Down
1 change: 1 addition & 0 deletions adam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
<dependency>
<groupId>org.bdgenomics.bdg-formats</groupId>
<artifactId>bdg-formats</artifactId>
<version>0.7.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ConsensusGeneratorFromKnowns(file: String, @transient sc: SparkContext) ex
// get region
val start = reads.map(_.record.getStart).min
val end = reads.map(_.getEnd).max
val refId = reads.head.record.getContig.getContigName
val refId = reads.head.record.getContigName

val region = ReferenceRegion(refId, start, end + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ConsensusGeneratorFromReads extends ConsensusGenerator {
Consensus.generateAlternateConsensus(
r.getSequence,
ReferencePosition(
r.getContig.getContigName,
r.getContigName,
r.getStart
),
r.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ class AlignmentRecordConverter extends Serializable {
})

// set the reference name, and alignment position, for mate
Option(adamRecord.getMateContig)
.map(_.getContigName)
Option(adamRecord.getMateContigName)
.foreach(builder.setMateReferenceName)
Option(adamRecord.getMateAlignmentStart)
.foreach(s => builder.setMateAlignmentStart(s.toInt + 1))
Expand Down Expand Up @@ -155,8 +154,8 @@ class AlignmentRecordConverter extends Serializable {
// only set alignment flags if read is aligned
if (m) {
// if we are aligned, we must have a reference
assert(adamRecord.getContig != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContig.getContigName)
assert(adamRecord.getContigName != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContigName)

// set the cigar, if provided
Option(adamRecord.getCigar).foreach(builder.setCigarString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object FragmentConverter extends Serializable {

// build record
AlignmentRecord.newBuilder()
.setContig(contig)
.setContigName(contig.getContigName)
.setStart(fragmentRegion.start)
.setEnd(fragmentRegion.end)
.setSequence(fragmentString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SAMRecordConverter extends Serializable with Logging {
val readReference: Int = samRecord.getReferenceIndex
if (readReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getReferenceName).foreach { (rec) =>
builder.setContig(SequenceRecord.toADAMContig(rec))
builder.setContigName(SequenceRecord.toADAMContig(rec).getContigName)
}

// set read alignment flag
Expand Down Expand Up @@ -128,7 +128,7 @@ class SAMRecordConverter extends Serializable with Logging {

if (mateReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getMateReferenceName).foreach { (rec) =>
builder.setMateContig(SequenceRecord.toADAMContig(rec))
builder.setMateContigName(SequenceRecord.toADAMContig(rec).getContigName)
}

val mateStart = samRecord.getMateAlignmentStart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object ReferencePosition extends Serializable {
* @see fivePrime
*/
def apply(record: AlignmentRecord): ReferencePosition = {
new ReferencePosition(record.getContig.getContigName, record.getStart)
new ReferencePosition(record.getContigName, record.getStart)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ReferenceRegion {
if (record.getReadMapped) {
Some(
ReferenceRegion(
record.getContig.getContigName,
record.getContigName,
record.getStart,
record.getEnd
)
Expand All @@ -92,7 +92,7 @@ object ReferenceRegion {
}

def apply(record: AlignmentRecord): ReferenceRegion = {
ReferenceRegion(record.getContig.getContigName, record.getStart, record.getEnd)
ReferenceRegion(record.getContigName, record.getStart, record.getEnd)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,27 +345,6 @@ object SequenceRecord {
fromADAMContig(fragment.getContig)
}

/**
* Convert an Read into one or more SequenceRecords.
* The reason that we can't simply use the "fromSpecificRecord" method, below, is that each Read
* can (through the fact that it could be a pair of reads) contain 1 or 2 possible SequenceRecord entries
* for the SequenceDictionary itself. Both have to be extracted, separately.
*
* @param rec The Read from which to extract the SequenceRecord entries
* @return a list of all SequenceRecord entries derivable from this record.
*/
def fromADAMRecord(rec: AlignmentRecord): Set[SequenceRecord] = {
assert(rec != null, "Read was null")
if (rec.getContig != null || rec.getMateContig != null) {
// The contig should be null for unmapped read
List(Option(rec.getContig), Option(rec.getMateContig))
.flatten
.map(fromADAMContig)
.toSet
} else
Set()
}

def fromSpecificRecord(rec: IndexedRecord): SequenceRecord = {
val schema = rec.getSchema
if (schema.getField("referenceId") != null) {
Expand All @@ -381,4 +360,6 @@ object SequenceRecord {
throw new AssertionError("Missing information to generate SequenceRecord")
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val projected: RDD[T] = loadParquet[T](filePath, None, projection = Some(projection))

val recs: RDD[SequenceRecord] =
if (isADAMRecord) {
projected.asInstanceOf[RDD[AlignmentRecord]].distinct().flatMap(rec => SequenceRecord.fromADAMRecord(rec))
} else if (isADAMContig) {
if (isADAMContig) {
projected.asInstanceOf[RDD[NucleotideContigFragment]].distinct().map(ctg => SequenceRecord.fromADAMContigFragment(ctg))
} else {
projected.distinct().map(SequenceRecord.fromSpecificRecord(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.models.SequenceRecord
import org.bdgenomics.adam.rdd.ADAMSequenceDictionaryRDDAggregator
import org.apache.spark.Logging
import org.bdgenomics.formats.avro._
import scala.collection.JavaConversions._

class FragmentRDDFunctions(rdd: RDD[Fragment]) extends ADAMSequenceDictionaryRDDAggregator[Fragment](rdd) {
class FragmentRDDFunctions(rdd: RDD[Fragment]) extends Serializable with Logging {

def toReads: RDD[AlignmentRecord] = {
val converter = new AlignmentRecordConverter
rdd.flatMap(converter.convertFragment)
}

def getSequenceRecordsFromElement(elem: Fragment): Set[SequenceRecord] = {
val alignments = asScalaBuffer(elem.getAlignments)
alignments.flatMap(SequenceRecord.fromADAMRecord).toSet
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
Expand All @@ -53,9 +53,7 @@ import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.reflect.ClassTag

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) {

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) extends Serializable with Logging {
/**
* Calculates the subset of the RDD whose AlignmentRecords overlap the corresponding
* query ReferenceRegion. Equality of the reference sequence (to which these are aligned)
Expand All @@ -72,7 +70,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
def filterByOverlappingRegion(query: ReferenceRegion): RDD[AlignmentRecord] = {
def overlapsQuery(rec: AlignmentRecord): Boolean =
rec.getReadMapped &&
rec.getContig.getContigName == query.referenceName &&
rec.getContigName == query.referenceName &&
rec.getStart < query.end &&
rec.getEnd > query.start
rdd.filter(overlapsQuery)
Expand Down Expand Up @@ -126,7 +124,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* As such, we must force the user to pass in the schema.
*
* @tparam T The type of the specific record we are saving.
*
* @param filename Path to save records to.
* @param sc SparkContext used for identifying underlying file system.
* @param schema Schema of records we are saving.
Expand Down Expand Up @@ -171,7 +168,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamAlignedRecordSave
*/
Expand Down Expand Up @@ -212,7 +208,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamSAMSave
* @see saveAsParquet
Expand All @@ -235,7 +230,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamAlignedRecordSave
* @see adamSAMSave
* @see saveAsParquet
Expand Down Expand Up @@ -263,9 +257,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @return A string on the driver representing this RDD of reads in SAM format.
*
* @see adamConvertToSAM
*/
def adamSAMString(sd: SequenceDictionary,
Expand Down Expand Up @@ -593,10 +585,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
}
}

def getSequenceRecordsFromElement(elem: AlignmentRecord): Set[SequenceRecord] = {
SequenceRecord.fromADAMRecord(elem).toSet
}

/**
* Converts an RDD of ADAM read records into SAM records.
*
Expand Down Expand Up @@ -635,7 +623,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*
* @see adamCountQmers
*/
def adamCountKmers(kmerLength: Int): RDD[(String, Long)] = {
Expand Down Expand Up @@ -696,15 +683,13 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* Realigns indels using a concensus-based heuristic.
*
* @see RealignIndels
*
* @param isSorted If the input data is sorted, setting this parameter to true avoids a second sort.
* @param maxIndelSize The size of the largest indel to use for realignment.
* @param maxConsensusNumber The maximum number of consensus sequences to realign against per
* target region.
* @param lodThreshold Log-odds threhold to use when realigning; realignments are only finalized
* if the log-odds threshold is exceeded.
* @param maxTargetSize The maximum width of a single target region for realignment.
*
* @return Returns an RDD of mapped reads which have been realigned.
*/
def adamRealignIndels(
Expand All @@ -724,6 +709,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Groups all reads by record group and read name
*
* @return SingleReadBuckets with primary, secondary and unmapped reads
*/
def adamSingleReadBuckets(): RDD[SingleReadBucket] = {
Expand Down Expand Up @@ -759,6 +745,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Returns the subset of the ADAMRecords which have an attribute with the given name.
*
* @param tagName The name of the attribute to filter on (should be length 2)
* @return An RDD[Read] containing the subset of records with a tag that matches the given name.
*/
Expand Down Expand Up @@ -928,7 +915,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* were _originally_ paired together.
*
* @note The RDD that this is called on should be the RDD with the first read from the pair.
*
* @param secondPairRdd The rdd containing the second read from the pairs.
* @param validationStringency How stringently to validate the reads.
* @return Returns an RDD with the pair information recomputed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object DuplicateMetrics {
b2i(f(record)),
b2i(f(record) && record.getReadMapped && record.getMateMapped),
b2i(f(record) && record.getReadMapped && !record.getMateMapped),
b2i(f(record) && (!isSameContig(record.getContig, record.getMateContig)))
b2i(f(record) && (!isSameContig(record.getContigName, record.getMateContigName)))
)
}
(duplicateMetrics(isPrimary), duplicateMetrics(isSecondary))
Expand Down Expand Up @@ -97,7 +97,7 @@ object FlagStat {
rdd.map {
p =>
val mateMappedToDiffChromosome =
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContig, p.getMateContig)
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContigName, p.getMateContigName)
val (primaryDuplicates, secondaryDuplicates) = DuplicateMetrics(p)
new FlagStatMetrics(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ case class MDTagging(
val referenceFileB = sc.broadcast(referenceFile)
reads.map(read => {
(for {
contig <- Option(read.getContig)
contigName <- Option(contig.getContigName)
contig <- Option(read.getContigName)
if read.getReadMapped
} yield {
maybeMDTagRead(read, referenceFileB.value.extract(ReferenceRegion(read)))
Expand Down Expand Up @@ -105,5 +104,5 @@ object MDTagging {

case class IncorrectMDTagException(read: AlignmentRecord, mdTag: String) extends Exception {
override def getMessage: String =
s"Read: ${read.getReadName}, pos: ${read.getContig.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
s"Read: ${read.getReadName}, pos: ${read.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object IndelRealignmentTarget {
maxIndelSize: Int): Seq[IndelRealignmentTarget] = CreateIndelRealignmentTargets.time {

val region = ReferenceRegion(read.record)
val refId = read.record.getContig.getContigName
val refId = read.record.getContigName
var pos = List[ReferenceRegion]()
var referencePos = read.record.getStart
val cigar = read.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private[rdd] class RealignIndels(

// get reference from reads
val (reference, refStart, refEnd) = getReferenceFromReads(reads.map(r => new RichAlignmentRecord(r)))
val refRegion = ReferenceRegion(reads.head.record.getContig.getContigName, refStart, refEnd)
val refRegion = ReferenceRegion(reads.head.record.getContigName, refStart, refEnd)

// preprocess reads and get consensus
val readsToClean = consensusModel.preprocessReadsForRealignment(
Expand Down
Loading

0 comments on commit b350f9c

Please sign in to comment.