Skip to content

Commit

Permalink
Replaced Contig with ContigName in AlignmentRecord and related changes
Browse files Browse the repository at this point in the history
Contig factor out project, cleaning up some comments

clean up some unintended whitespace arbitrary diffs

Removed subproject POM changes and other small issues
  • Loading branch information
jpdna committed Apr 5, 2016
1 parent b8e36b2 commit c924952
Show file tree
Hide file tree
Showing 37 changed files with 129 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ class Fragments2Reads(protected val args: Fragments2ReadsArgs) extends BDGSparkC
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.bdgenomics.adam.models.{
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.MDTagging
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, MDTagging }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -267,7 +267,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
)
}

val aRdd =
val aRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ConsensusGeneratorFromKnowns(file: String, @transient sc: SparkContext) ex
// get region
val start = reads.map(_.record.getStart).min
val end = reads.map(_.getEnd).max
val refId = reads.head.record.getContig.getContigName
val refId = reads.head.record.getContigName

val region = ReferenceRegion(refId, start, end + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ConsensusGeneratorFromReads extends ConsensusGenerator {
Consensus.generateAlternateConsensus(
r.getSequence,
ReferencePosition(
r.getContig.getContigName,
r.getContigName,
r.getStart
),
r.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ class AlignmentRecordConverter extends Serializable {
})

// set the reference name, and alignment position, for mate
Option(adamRecord.getMateContig)
.map(_.getContigName)
Option(adamRecord.getMateContigName)
.foreach(builder.setMateReferenceName)
Option(adamRecord.getMateAlignmentStart)
.foreach(s => builder.setMateAlignmentStart(s.toInt + 1))
Expand Down Expand Up @@ -155,8 +154,8 @@ class AlignmentRecordConverter extends Serializable {
// only set alignment flags if read is aligned
if (m) {
// if we are aligned, we must have a reference
assert(adamRecord.getContig != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContig.getContigName)
require(adamRecord.getContigName != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContigName)

// set the cigar, if provided
Option(adamRecord.getCigar).foreach(builder.setCigarString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object FragmentConverter extends Serializable {

// build record
AlignmentRecord.newBuilder()
.setContig(contig)
.setContigName(contig.getContigName)
.setStart(fragmentRegion.start)
.setEnd(fragmentRegion.end)
.setSequence(fragmentString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ class SAMRecordConverter extends Serializable with Logging {
// This prevents looking up a -1 in the sequence dictionary
val readReference: Int = samRecord.getReferenceIndex
if (readReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getReferenceName).foreach { (rec) =>
builder.setContig(SequenceRecord.toADAMContig(rec))
}
builder.setContigName(samRecord.getReferenceName)

// set read alignment flag
val start: Int = samRecord.getAlignmentStart
Expand Down Expand Up @@ -127,9 +125,7 @@ class SAMRecordConverter extends Serializable with Logging {
val mateReference: Int = samRecord.getMateReferenceIndex

if (mateReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getMateReferenceName).foreach { (rec) =>
builder.setMateContig(SequenceRecord.toADAMContig(rec))
}
builder.setMateContigName(samRecord.getMateReferenceName)

val mateStart = samRecord.getMateAlignmentStart
if (mateStart > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object ReferencePosition extends Serializable {
* @see fivePrime
*/
def apply(record: AlignmentRecord): ReferencePosition = {
new ReferencePosition(record.getContig.getContigName, record.getStart)
new ReferencePosition(record.getContigName, record.getStart)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ReferenceRegion {
if (record.getReadMapped) {
Some(
ReferenceRegion(
record.getContig.getContigName,
record.getContigName,
record.getStart,
record.getEnd
)
Expand All @@ -92,7 +92,7 @@ object ReferenceRegion {
}

def apply(record: AlignmentRecord): ReferenceRegion = {
ReferenceRegion(record.getContig.getContigName, record.getStart, record.getEnd)
ReferenceRegion(record.getContigName, record.getStart, record.getEnd)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,27 +345,6 @@ object SequenceRecord {
fromADAMContig(fragment.getContig)
}

/**
* Convert an Read into one or more SequenceRecords.
* The reason that we can't simply use the "fromSpecificRecord" method, below, is that each Read
* can (through the fact that it could be a pair of reads) contain 1 or 2 possible SequenceRecord entries
* for the SequenceDictionary itself. Both have to be extracted, separately.
*
* @param rec The Read from which to extract the SequenceRecord entries
* @return a list of all SequenceRecord entries derivable from this record.
*/
def fromADAMRecord(rec: AlignmentRecord): Set[SequenceRecord] = {
assert(rec != null, "Read was null")
if (rec.getContig != null || rec.getMateContig != null) {
// The contig should be null for unmapped read
List(Option(rec.getContig), Option(rec.getMateContig))
.flatten
.map(fromADAMContig)
.toSet
} else
Set()
}

def fromSpecificRecord(rec: IndexedRecord): SequenceRecord = {
val schema = rec.getSchema
if (schema.getField("referenceId") != null) {
Expand All @@ -381,4 +360,6 @@ object SequenceRecord {
throw new AssertionError("Missing information to generate SequenceRecord")
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val projected: RDD[T] = loadParquet[T](filePath, None, projection = Some(projection))

val recs: RDD[SequenceRecord] =
if (isADAMRecord) {
projected.asInstanceOf[RDD[AlignmentRecord]].distinct().flatMap(rec => SequenceRecord.fromADAMRecord(rec))
} else if (isADAMContig) {
if (isADAMContig) {
projected.asInstanceOf[RDD[NucleotideContigFragment]].distinct().map(ctg => SequenceRecord.fromADAMContigFragment(ctg))
} else {
projected.distinct().map(SequenceRecord.fromSpecificRecord(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.models.SequenceRecord
import org.bdgenomics.adam.rdd.ADAMSequenceDictionaryRDDAggregator
import org.apache.spark.Logging
import org.bdgenomics.formats.avro._
import scala.collection.JavaConversions._

class FragmentRDDFunctions(rdd: RDD[Fragment]) extends ADAMSequenceDictionaryRDDAggregator[Fragment](rdd) {
class FragmentRDDFunctions(rdd: RDD[Fragment]) extends Serializable with Logging {

def toReads: RDD[AlignmentRecord] = {
val converter = new AlignmentRecordConverter
rdd.flatMap(converter.convertFragment)
}

def getSequenceRecordsFromElement(elem: Fragment): Set[SequenceRecord] = {
val alignments = asScalaBuffer(elem.getAlignments)
alignments.flatMap(SequenceRecord.fromADAMRecord).toSet
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
Expand All @@ -53,9 +53,7 @@ import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.reflect.ClassTag

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) {

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) extends Serializable with Logging {
/**
* Calculates the subset of the RDD whose AlignmentRecords overlap the corresponding
* query ReferenceRegion. Equality of the reference sequence (to which these are aligned)
Expand All @@ -72,7 +70,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
def filterByOverlappingRegion(query: ReferenceRegion): RDD[AlignmentRecord] = {
def overlapsQuery(rec: AlignmentRecord): Boolean =
rec.getReadMapped &&
rec.getContig.getContigName == query.referenceName &&
rec.getContigName == query.referenceName &&
rec.getStart < query.end &&
rec.getEnd > query.start
rdd.filter(overlapsQuery)
Expand Down Expand Up @@ -126,7 +124,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* As such, we must force the user to pass in the schema.
*
* @tparam T The type of the specific record we are saving.
*
* @param filename Path to save records to.
* @param sc SparkContext used for identifying underlying file system.
* @param schema Schema of records we are saving.
Expand Down Expand Up @@ -171,7 +168,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamAlignedRecordSave
*/
Expand Down Expand Up @@ -212,7 +208,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamSAMSave
* @see saveAsParquet
Expand All @@ -235,7 +230,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamAlignedRecordSave
* @see adamSAMSave
* @see saveAsParquet
Expand Down Expand Up @@ -263,9 +257,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @return A string on the driver representing this RDD of reads in SAM format.
*
* @see adamConvertToSAM
*/
def adamSAMString(sd: SequenceDictionary,
Expand Down Expand Up @@ -593,10 +585,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
}
}

def getSequenceRecordsFromElement(elem: AlignmentRecord): Set[SequenceRecord] = {
SequenceRecord.fromADAMRecord(elem).toSet
}

/**
* Converts an RDD of ADAM read records into SAM records.
*
Expand Down Expand Up @@ -635,7 +623,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*
* @see adamCountQmers
*/
def adamCountKmers(kmerLength: Int): RDD[(String, Long)] = {
Expand Down Expand Up @@ -696,15 +683,13 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* Realigns indels using a concensus-based heuristic.
*
* @see RealignIndels
*
* @param isSorted If the input data is sorted, setting this parameter to true avoids a second sort.
* @param maxIndelSize The size of the largest indel to use for realignment.
* @param maxConsensusNumber The maximum number of consensus sequences to realign against per
* target region.
* @param lodThreshold Log-odds threhold to use when realigning; realignments are only finalized
* if the log-odds threshold is exceeded.
* @param maxTargetSize The maximum width of a single target region for realignment.
*
* @return Returns an RDD of mapped reads which have been realigned.
*/
def adamRealignIndels(
Expand All @@ -724,6 +709,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Groups all reads by record group and read name
*
* @return SingleReadBuckets with primary, secondary and unmapped reads
*/
def adamSingleReadBuckets(): RDD[SingleReadBucket] = {
Expand Down Expand Up @@ -759,6 +745,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Returns the subset of the ADAMRecords which have an attribute with the given name.
*
* @param tagName The name of the attribute to filter on (should be length 2)
* @return An RDD[Read] containing the subset of records with a tag that matches the given name.
*/
Expand Down Expand Up @@ -928,7 +915,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* were _originally_ paired together.
*
* @note The RDD that this is called on should be the RDD with the first read from the pair.
*
* @param secondPairRdd The rdd containing the second read from the pairs.
* @param validationStringency How stringently to validate the reads.
* @return Returns an RDD with the pair information recomputed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object DuplicateMetrics {
b2i(f(record)),
b2i(f(record) && record.getReadMapped && record.getMateMapped),
b2i(f(record) && record.getReadMapped && !record.getMateMapped),
b2i(f(record) && (!isSameContig(record.getContig, record.getMateContig)))
b2i(f(record) && (!isSameContig(record.getContigName, record.getMateContigName)))
)
}
(duplicateMetrics(isPrimary), duplicateMetrics(isSecondary))
Expand Down Expand Up @@ -97,7 +97,7 @@ object FlagStat {
rdd.map {
p =>
val mateMappedToDiffChromosome =
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContig, p.getMateContig)
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContigName, p.getMateContigName)
val (primaryDuplicates, secondaryDuplicates) = DuplicateMetrics(p)
new FlagStatMetrics(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ case class MDTagging(
val referenceFileB = sc.broadcast(referenceFile)
reads.map(read => {
(for {
contig <- Option(read.getContig)
contigName <- Option(contig.getContigName)
contig <- Option(read.getContigName)
if read.getReadMapped
} yield {
maybeMDTagRead(read, referenceFileB.value.extract(ReferenceRegion(read)))
Expand Down Expand Up @@ -105,5 +104,5 @@ object MDTagging {

case class IncorrectMDTagException(read: AlignmentRecord, mdTag: String) extends Exception {
override def getMessage: String =
s"Read: ${read.getReadName}, pos: ${read.getContig.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
s"Read: ${read.getReadName}, pos: ${read.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object IndelRealignmentTarget {
maxIndelSize: Int): Seq[IndelRealignmentTarget] = CreateIndelRealignmentTargets.time {

val region = ReferenceRegion(read.record)
val refId = read.record.getContig.getContigName
val refId = read.record.getContigName
var pos = List[ReferenceRegion]()
var referencePos = read.record.getStart
val cigar = read.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private[rdd] class RealignIndels(

// get reference from reads
val (reference, refStart, refEnd) = getReferenceFromReads(reads.map(r => new RichAlignmentRecord(r)))
val refRegion = ReferenceRegion(reads.head.record.getContig.getContigName, refStart, refEnd)
val refRegion = ReferenceRegion(reads.head.record.getContigName, refStart, refEnd)

// preprocess reads and get consensus
val readsToClean = consensusModel.preprocessReadsForRealignment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class RichAlignmentRecord(val record: AlignmentRecord) {
} else {
Strand.Forward
}
ReferencePosition(record.getContig.getContigName, fivePrimePosition, strand)
ReferencePosition(record.getContigName, fivePrimePosition, strand)
} catch {
case e: Throwable => {
println("caught " + e + " when trying to get position for " + record)
Expand Down Expand Up @@ -167,7 +167,7 @@ class RichAlignmentRecord(val record: AlignmentRecord) {

def getReferenceContext(readOffset: Int, referencePosition: Long, cigarElem: CigarElement, elemOffset: Int): ReferenceSequenceContext = {
val position = if (record.getReadMapped) {
Some(ReferencePosition(record.getContig.getContigName, referencePosition))
Some(ReferencePosition(record.getContigName, referencePosition))
} else {
None
}
Expand Down
Loading

0 comments on commit c924952

Please sign in to comment.