Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced Contig with ContigName in AlignmentRecord and related changes #988

Merged
merged 1 commit into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ class Fragments2Reads(protected val args: Fragments2ReadsArgs) extends BDGSparkC
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.bdgenomics.adam.models.{
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.MDTagging
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, MDTagging }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.AlignmentRecord
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -267,7 +267,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
)
}

val aRdd =
val aRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ConsensusGeneratorFromKnowns(file: String, @transient sc: SparkContext) ex
// get region
val start = reads.map(_.record.getStart).min
val end = reads.map(_.getEnd).max
val refId = reads.head.record.getContig.getContigName
val refId = reads.head.record.getContigName

val region = ReferenceRegion(refId, start, end + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ConsensusGeneratorFromReads extends ConsensusGenerator {
Consensus.generateAlternateConsensus(
r.getSequence,
ReferencePosition(
r.getContig.getContigName,
r.getContigName,
r.getStart
),
r.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ class AlignmentRecordConverter extends Serializable {
})

// set the reference name, and alignment position, for mate
Option(adamRecord.getMateContig)
.map(_.getContigName)
Option(adamRecord.getMateContigName)
.foreach(builder.setMateReferenceName)
Option(adamRecord.getMateAlignmentStart)
.foreach(s => builder.setMateAlignmentStart(s.toInt + 1))
Expand Down Expand Up @@ -155,8 +154,8 @@ class AlignmentRecordConverter extends Serializable {
// only set alignment flags if read is aligned
if (m) {
// if we are aligned, we must have a reference
assert(adamRecord.getContig != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContig.getContigName)
require(adamRecord.getContigName != null, "Cannot have null contig if aligned.")
builder.setReferenceName(adamRecord.getContigName)

// set the cigar, if provided
Option(adamRecord.getCigar).foreach(builder.setCigarString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object FragmentConverter extends Serializable {

// build record
AlignmentRecord.newBuilder()
.setContig(contig)
.setContigName(contig.getContigName)
.setStart(fragmentRegion.start)
.setEnd(fragmentRegion.end)
.setSequence(fragmentString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ class SAMRecordConverter extends Serializable with Logging {
// This prevents looking up a -1 in the sequence dictionary
val readReference: Int = samRecord.getReferenceIndex
if (readReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getReferenceName).foreach { (rec) =>
builder.setContig(SequenceRecord.toADAMContig(rec))
}
builder.setContigName(samRecord.getReferenceName)

// set read alignment flag
val start: Int = samRecord.getAlignmentStart
Expand Down Expand Up @@ -127,9 +125,7 @@ class SAMRecordConverter extends Serializable with Logging {
val mateReference: Int = samRecord.getMateReferenceIndex

if (mateReference != SAMRecord.NO_ALIGNMENT_REFERENCE_INDEX) {
dict(samRecord.getMateReferenceName).foreach { (rec) =>
builder.setMateContig(SequenceRecord.toADAMContig(rec))
}
builder.setMateContigName(samRecord.getMateReferenceName)

val mateStart = samRecord.getMateAlignmentStart
if (mateStart > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object ReferencePosition extends Serializable {
* @see fivePrime
*/
def apply(record: AlignmentRecord): ReferencePosition = {
new ReferencePosition(record.getContig.getContigName, record.getStart)
new ReferencePosition(record.getContigName, record.getStart)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ReferenceRegion {
if (record.getReadMapped) {
Some(
ReferenceRegion(
record.getContig.getContigName,
record.getContigName,
record.getStart,
record.getEnd
)
Expand All @@ -92,7 +92,7 @@ object ReferenceRegion {
}

def apply(record: AlignmentRecord): ReferenceRegion = {
ReferenceRegion(record.getContig.getContigName, record.getStart, record.getEnd)
ReferenceRegion(record.getContigName, record.getStart, record.getEnd)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,27 +345,6 @@ object SequenceRecord {
fromADAMContig(fragment.getContig)
}

/**
* Convert an Read into one or more SequenceRecords.
* The reason that we can't simply use the "fromSpecificRecord" method, below, is that each Read
* can (through the fact that it could be a pair of reads) contain 1 or 2 possible SequenceRecord entries
* for the SequenceDictionary itself. Both have to be extracted, separately.
*
* @param rec The Read from which to extract the SequenceRecord entries
* @return a list of all SequenceRecord entries derivable from this record.
*/
def fromADAMRecord(rec: AlignmentRecord): Set[SequenceRecord] = {
assert(rec != null, "Read was null")
if (rec.getContig != null || rec.getMateContig != null) {
// The contig should be null for unmapped read
List(Option(rec.getContig), Option(rec.getMateContig))
.flatten
.map(fromADAMContig)
.toSet
} else
Set()
}

def fromSpecificRecord(rec: IndexedRecord): SequenceRecord = {
val schema = rec.getSchema
if (schema.getField("referenceId") != null) {
Expand All @@ -381,4 +360,6 @@ object SequenceRecord {
throw new AssertionError("Missing information to generate SequenceRecord")
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val projected: RDD[T] = loadParquet[T](filePath, None, projection = Some(projection))

val recs: RDD[SequenceRecord] =
if (isADAMRecord) {
projected.asInstanceOf[RDD[AlignmentRecord]].distinct().flatMap(rec => SequenceRecord.fromADAMRecord(rec))
} else if (isADAMContig) {
if (isADAMContig) {
projected.asInstanceOf[RDD[NucleotideContigFragment]].distinct().map(ctg => SequenceRecord.fromADAMContigFragment(ctg))
} else {
projected.distinct().map(SequenceRecord.fromSpecificRecord(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.models.SequenceRecord
import org.bdgenomics.adam.rdd.ADAMSequenceDictionaryRDDAggregator
import org.apache.spark.Logging
import org.bdgenomics.formats.avro._
import scala.collection.JavaConversions._

class FragmentRDDFunctions(rdd: RDD[Fragment]) extends ADAMSequenceDictionaryRDDAggregator[Fragment](rdd) {
class FragmentRDDFunctions(rdd: RDD[Fragment]) extends Serializable with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed? I'm actually not sure what ADAMSequenceDictionaryRDDAggregator was for in the first place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ADAMSequenceDictionaryRDDAggregator
seemed to be used to extract SeqDicts from existing object, but didn't seem to make sense anymore in context of having removed Contig from AlignmentRecord, including within Fragmen - and importantly was not actually used in any code or tests. Extending ADAMSequenceDictionaryRDDAggregator here in fact blocked the Contig factoring out as it requires data which is no longer in AlignmentRecord. I searched for any usages of the removed functions and there were none, so I removed it as a superclass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, I'm always in favor of removing unnecessary code.

I wonder if this changes anyone's opinion on this commit ryan-williams@c5a8f51 that adds extension to some of the Functions classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this and the commit you pointed at @heuermh LGTM.


def toReads: RDD[AlignmentRecord] = {
val converter = new AlignmentRecordConverter
rdd.flatMap(converter.convertFragment)
}

def getSequenceRecordsFromElement(elem: Fragment): Set[SequenceRecord] = {
val alignments = asScalaBuffer(elem.getAlignments)
alignments.flatMap(SequenceRecord.fromADAMRecord).toSet
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.avro.specific.{ SpecificDatumWriter, SpecificRecordBase }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
Expand All @@ -53,9 +53,7 @@ import scala.annotation.tailrec
import scala.language.implicitConversions
import scala.reflect.ClassTag

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
extends ADAMSequenceDictionaryRDDAggregator[AlignmentRecord](rdd) {

class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) extends Serializable with Logging {
/**
* Calculates the subset of the RDD whose AlignmentRecords overlap the corresponding
* query ReferenceRegion. Equality of the reference sequence (to which these are aligned)
Expand All @@ -72,7 +70,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
def filterByOverlappingRegion(query: ReferenceRegion): RDD[AlignmentRecord] = {
def overlapsQuery(rec: AlignmentRecord): Boolean =
rec.getReadMapped &&
rec.getContig.getContigName == query.referenceName &&
rec.getContigName == query.referenceName &&
rec.getStart < query.end &&
rec.getEnd > query.start
rdd.filter(overlapsQuery)
Expand Down Expand Up @@ -126,7 +124,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* As such, we must force the user to pass in the schema.
*
* @tparam T The type of the specific record we are saving.
*
* @param filename Path to save records to.
* @param sc SparkContext used for identifying underlying file system.
* @param schema Schema of records we are saving.
Expand Down Expand Up @@ -171,7 +168,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamAlignedRecordSave
*/
Expand Down Expand Up @@ -212,7 +208,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamSave
* @see adamSAMSave
* @see saveAsParquet
Expand All @@ -235,7 +230,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @see adamAlignedRecordSave
* @see adamSAMSave
* @see saveAsParquet
Expand Down Expand Up @@ -263,9 +257,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* aligned to.
* @param rgd Record group dictionary describing the record groups these
* reads are from.
*
* @return A string on the driver representing this RDD of reads in SAM format.
*
* @see adamConvertToSAM
*/
def adamSAMString(sd: SequenceDictionary,
Expand Down Expand Up @@ -593,10 +585,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
}
}

def getSequenceRecordsFromElement(elem: AlignmentRecord): Set[SequenceRecord] = {
SequenceRecord.fromADAMRecord(elem).toSet
}

/**
* Converts an RDD of ADAM read records into SAM records.
*
Expand Down Expand Up @@ -635,7 +623,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
*
* @param kmerLength The value of _k_ to use for cutting _k_-mers.
* @return Returns an RDD containing k-mer/count pairs.
*
* @see adamCountQmers
*/
def adamCountKmers(kmerLength: Int): RDD[(String, Long)] = {
Expand Down Expand Up @@ -696,15 +683,13 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* Realigns indels using a concensus-based heuristic.
*
* @see RealignIndels
*
* @param isSorted If the input data is sorted, setting this parameter to true avoids a second sort.
* @param maxIndelSize The size of the largest indel to use for realignment.
* @param maxConsensusNumber The maximum number of consensus sequences to realign against per
* target region.
* @param lodThreshold Log-odds threhold to use when realigning; realignments are only finalized
* if the log-odds threshold is exceeded.
* @param maxTargetSize The maximum width of a single target region for realignment.
*
* @return Returns an RDD of mapped reads which have been realigned.
*/
def adamRealignIndels(
Expand All @@ -724,6 +709,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Groups all reads by record group and read name
*
* @return SingleReadBuckets with primary, secondary and unmapped reads
*/
def adamSingleReadBuckets(): RDD[SingleReadBucket] = {
Expand Down Expand Up @@ -759,6 +745,7 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])

/**
* Returns the subset of the ADAMRecords which have an attribute with the given name.
*
* @param tagName The name of the attribute to filter on (should be length 2)
* @return An RDD[Read] containing the subset of records with a tag that matches the given name.
*/
Expand Down Expand Up @@ -928,7 +915,6 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
* were _originally_ paired together.
*
* @note The RDD that this is called on should be the RDD with the first read from the pair.
*
* @param secondPairRdd The rdd containing the second read from the pairs.
* @param validationStringency How stringently to validate the reads.
* @return Returns an RDD with the pair information recomputed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object DuplicateMetrics {
b2i(f(record)),
b2i(f(record) && record.getReadMapped && record.getMateMapped),
b2i(f(record) && record.getReadMapped && !record.getMateMapped),
b2i(f(record) && (!isSameContig(record.getContig, record.getMateContig)))
b2i(f(record) && (!isSameContig(record.getContigName, record.getMateContigName)))
)
}
(duplicateMetrics(isPrimary), duplicateMetrics(isSecondary))
Expand Down Expand Up @@ -97,7 +97,7 @@ object FlagStat {
rdd.map {
p =>
val mateMappedToDiffChromosome =
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContig, p.getMateContig)
p.getReadPaired && p.getReadMapped && p.getMateMapped && !isSameContig(p.getContigName, p.getMateContigName)
val (primaryDuplicates, secondaryDuplicates) = DuplicateMetrics(p)
new FlagStatMetrics(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ case class MDTagging(
val referenceFileB = sc.broadcast(referenceFile)
reads.map(read => {
(for {
contig <- Option(read.getContig)
contigName <- Option(contig.getContigName)
contig <- Option(read.getContigName)
if read.getReadMapped
} yield {
maybeMDTagRead(read, referenceFileB.value.extract(ReferenceRegion(read)))
Expand Down Expand Up @@ -105,5 +104,5 @@ object MDTagging {

case class IncorrectMDTagException(read: AlignmentRecord, mdTag: String) extends Exception {
override def getMessage: String =
s"Read: ${read.getReadName}, pos: ${read.getContig.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
s"Read: ${read.getReadName}, pos: ${read.getContigName}:${read.getStart}, cigar: ${read.getCigar}, existing MD tag: ${read.getMismatchingPositions}, correct MD tag: $mdTag"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object IndelRealignmentTarget {
maxIndelSize: Int): Seq[IndelRealignmentTarget] = CreateIndelRealignmentTargets.time {

val region = ReferenceRegion(read.record)
val refId = read.record.getContig.getContigName
val refId = read.record.getContigName
var pos = List[ReferenceRegion]()
var referencePos = read.record.getStart
val cigar = read.samtoolsCigar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private[rdd] class RealignIndels(

// get reference from reads
val (reference, refStart, refEnd) = getReferenceFromReads(reads.map(r => new RichAlignmentRecord(r)))
val refRegion = ReferenceRegion(reads.head.record.getContig.getContigName, refStart, refEnd)
val refRegion = ReferenceRegion(reads.head.record.getContigName, refStart, refEnd)

// preprocess reads and get consensus
val readsToClean = consensusModel.preprocessReadsForRealignment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class RichAlignmentRecord(val record: AlignmentRecord) {
} else {
Strand.Forward
}
ReferencePosition(record.getContig.getContigName, fivePrimePosition, strand)
ReferencePosition(record.getContigName, fivePrimePosition, strand)
} catch {
case e: Throwable => {
println("caught " + e + " when trying to get position for " + record)
Expand Down Expand Up @@ -167,7 +167,7 @@ class RichAlignmentRecord(val record: AlignmentRecord) {

def getReferenceContext(readOffset: Int, referencePosition: Long, cigarElem: CigarElement, elemOffset: Int): ReferenceSequenceContext = {
val position = if (record.getReadMapped) {
Some(ReferencePosition(record.getContig.getContigName, referencePosition))
Some(ReferencePosition(record.getContigName, referencePosition))
} else {
None
}
Expand Down
Loading