Skip to content

Commit

Permalink
[ADAM-864] Don't force shuffle if reducing partition count.
Browse files Browse the repository at this point in the history
Resolves #864. In Spark, coalescing will reduce the number of partitions in an
RDD without performing a shuffle, but coalescing will only increase the number
of partitions if a shuffle is performed. This PR modifies Transform and Vcf2ADAM
to check whether the coalesce option will increase or decrease the partition
count. Additionally, it adds a flag that allows the user to force a shuffle;
this may be desirable as this causes a HashPartitioned shuffle, which may
improve the balance of records across partitions. Additionally, we modify
ADAM2Fasta to support similar options.
  • Loading branch information
fnothaft committed Oct 13, 2015
1 parent 2c0bd31 commit 88368bc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
22 changes: 15 additions & 7 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4JOption }

class ADAM2FastaArgs extends ParquetLoadSaveArgs {
@Args4JOption(required = false, name = "-partitions", usage = "Number of partitions, default 1")
var partitions: Int = 1
@Args4JOption(required = false, name = "-shuffle", usage = "Shuffle while partitioning, default false")
var shuffle: Boolean = false
@Args4JOption(required = false, name = "-coalesce", usage = "Choose the number of partitions to coalesce down to.")
var coalesce: Int = -1
@Args4JOption(required = false, name = "-force_shuffle_coalesce", usage = "Force shuffle while partitioning, default false.")
var forceShuffle: Boolean = false
@Args4JOption(required = false, name = "-line-width", usage = "Hard wrap FASTA formatted sequence at line width, default 60")
var lineWidth: Int = 60
}
Expand All @@ -53,9 +53,17 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
val contigFragments: RDD[NucleotideContigFragment] = sc.loadParquet(args.inputPath, projection = Some(proj))

log.info("Merging fragments and writing FASTA to disk.")
contigFragments
val contigs = contigFragments
.mergeFragments()
.coalesce(args.partitions, args.shuffle)
.saveAsFasta(args.outputPath, args.lineWidth)
val cc = if (args.coalesce > 0) {
if (args.coalesce > contigs.partitions.size || args.forceShuffle) {
contigs.coalesce(args.coalesce, shuffle = true)
} else {
contigs.coalesce(args.coalesce, shuffle = false)
}
} else {
contigs
}
cc.saveAsFasta(args.outputPath, args.lineWidth)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var repartition: Int = -1
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false
@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces Transform to load from BAM/SAM.")
Expand Down Expand Up @@ -155,7 +157,11 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
if (args.coalesce > adamRecords.partitions.size || args.forceShuffle) {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
} else {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = false)
}
}

// NOTE: For now, sorting needs to be the last transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Vcf2ADAMArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-onlyvariants", usage = "Output Variant objects instead of Genotypes")
var onlyvariants: Boolean = false
}
Expand All @@ -63,7 +66,11 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit

var adamVariants: RDD[VariantContext] = sc.loadVcf(args.vcfPath, sd = dictionary)
if (args.coalesce > 0) {
adamVariants = adamVariants.coalesce(args.coalesce, true)
if (args.coalesce > adamVariants.partitions.size || args.forceShuffle) {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = true)
} else {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = false)
}
}

if (args.onlyvariants) {
Expand Down

0 comments on commit 88368bc

Please sign in to comment.