Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-914] Create a GenomicRegionPartitioner given a partition count. #915

Merged
merged 1 commit into from
Jan 11, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,18 @@ case class GenomicPositionPartitioner(numParts: Int, seqLengths: Map[String, Lon

object GenomicPositionPartitioner {

/**
* Creates a GenomicRegionPartitioner with a specific number of partitions.
*
* @param numParts The number of partitions to have in the new partitioner.
* @param seqDict A sequence dictionary describing the known genomic contigs.
* @return Returns a partitioner that divides the known genome into a set number of partitions.
*/
def apply(numParts: Int, seqDict: SequenceDictionary): GenomicPositionPartitioner =
GenomicPositionPartitioner(numParts, extractLengthMap(seqDict))

def extractLengthMap(seqDict: SequenceDictionary): Map[String, Long] =
Map(seqDict.records.toSeq.map(rec => (rec.name, rec.length)): _*)
private[rdd] def extractLengthMap(seqDict: SequenceDictionary): Map[String, Long] =
seqDict.records.toSeq.map(rec => (rec.name, rec.length)).toMap
}

case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String, Long], start: Boolean = true) extends Partitioner with Logging {
Expand Down Expand Up @@ -140,6 +147,26 @@ case class GenomicRegionPartitioner(partitionSize: Long, seqLengths: Map[String,
}

object GenomicRegionPartitioner {

/**
* Creates a GenomicRegionPartitioner where partitions cover a specific range of the genome.
*
* @param partitionSize The number of bases in the reference genome that each partition should cover.
* @param seqDict A sequence dictionary describing the known genomic contigs.
* @return Returns a partitioner that divides the known genome into partitions of fixed size.
*/
def apply(partitionSize: Long, seqDict: SequenceDictionary): GenomicRegionPartitioner =
GenomicRegionPartitioner(partitionSize, GenomicPositionPartitioner.extractLengthMap(seqDict))

/**
* Creates a GenomicRegionPartitioner with a specific number of partitions.
*
* @param numParts The number of partitions to have in the new partitioner.
* @param seqDict A sequence dictionary describing the known genomic contigs.
* @return Returns a partitioner that divides the known genome into a set number of partitions.
*/
def apply(numParts: Int, seqDict: SequenceDictionary): GenomicRegionPartitioner = {
val lengths = GenomicPositionPartitioner.extractLengthMap(seqDict)
GenomicRegionPartitioner(lengths.values.sum / numParts, lengths)
}
}