Skip to content

Commit

Permalink
Merge pull request #191 from hammerlab/repartition
Browse files Browse the repository at this point in the history
Add repartition parameter
  • Loading branch information
massie committed Mar 29, 2014
2 parents bc89526 + d07816f commit 25465f8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ trait SparkArgs extends Args4jBase {
var spark_kryo_buffer_size = 4
@Args4jOption(required = false, name = "-spark_add_stats_listener", usage = "Register job stat reporter, which is useful for debug/profiling.")
var spark_add_stats_listener = false
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-repartition", usage = "Set the number of partitions to map data to")
var repartition: Int = -1
}

trait SparkCommand extends AdamCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class TransformArgs extends Args4jBase with ParquetArgs with SparkArgs {
var recalibrateBaseQualities: Boolean = false
@Args4jOption(required = false, name = "-known_snps", usage = "Sites-only VCF giving location of known SNPs")
var knownSnpsFile: String = null
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-realignIndels", usage = "Locally realign indels present in reads.")
var locallyRealign: Boolean = false
}
Expand All @@ -65,9 +63,10 @@ class Transform(protected val args: TransformArgs) extends AdamSparkCommand[Tran
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

var adamRecords: RDD[ADAMRecord] = sc.adamLoad(args.inputPath)
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)

if (args.repartition != -1) {
log.info("Repartitioning reads to to '%d' partitions".format(args.repartition))
adamRecords = adamRecords.repartition(args.repartition)
}

if (args.markDuplicates) {
Expand All @@ -87,6 +86,11 @@ class Transform(protected val args: TransformArgs) extends AdamSparkCommand[Tran
adamRecords = adamRecords.adamRealignIndels()
}

if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
}

// NOTE: For now, sorting needs to be the last transform
if (args.sortReads) {
log.info("Sorting reads")
Expand Down

0 comments on commit 25465f8

Please sign in to comment.