Skip to content

Commit

Permalink
[ADAM-864] Don't force shuffle if reducing partition count.
Browse files Browse the repository at this point in the history
Resolves #864. In Spark, coalescing will reduce the number of partitions in an
RDD without performing a shuffle, but coalescing will only increase the number
of partitions if a shuffle is performed. This PR modifies Transform and Vcf2ADAM
to check whether the coalesce option will increase or decrease the partition
count. Additionally, it adds a flag that allows the user to force a shuffle;
this may be desirable as this causes a HashPartitioned shuffle, which may
improve the balance of records across partitions.
  • Loading branch information
fnothaft committed Oct 13, 2015
1 parent 2c0bd31 commit b08f323
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var repartition: Int = -1
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false
@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces Transform to load from BAM/SAM.")
Expand Down Expand Up @@ -155,7 +157,11 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
if (args.coalesce > adamRecords.partitions.size || args.forceShuffle) {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
} else {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = false)
}
}

// NOTE: For now, sorting needs to be the last transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Vcf2ADAMArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-onlyvariants", usage = "Output Variant objects instead of Genotypes")
var onlyvariants: Boolean = false
}
Expand All @@ -63,7 +66,11 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit

var adamVariants: RDD[VariantContext] = sc.loadVcf(args.vcfPath, sd = dictionary)
if (args.coalesce > 0) {
adamVariants = adamVariants.coalesce(args.coalesce, true)
if (args.coalesce > adamVariants.partitions.size || args.forceShuffle) {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = true)
} else {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = false)
}
}

if (args.onlyvariants) {
Expand Down

0 comments on commit b08f323

Please sign in to comment.