Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-864] Don't force shuffle if reducing partition count. #866

Merged
merged 1 commit into from
Nov 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/ADAM2Fasta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4JOption }

class ADAM2FastaArgs extends ParquetLoadSaveArgs {
@Args4JOption(required = false, name = "-partitions", usage = "Number of partitions, default 1")
var partitions: Int = 1
@Args4JOption(required = false, name = "-shuffle", usage = "Shuffle while partitioning, default false")
var shuffle: Boolean = false
@Args4JOption(required = false, name = "-coalesce", usage = "Choose the number of partitions to coalesce down to.")
var coalesce: Int = -1
@Args4JOption(required = false, name = "-force_shuffle_coalesce", usage = "Force shuffle while partitioning, default false.")
var forceShuffle: Boolean = false
@Args4JOption(required = false, name = "-line-width", usage = "Hard wrap FASTA formatted sequence at line width, default 60")
var lineWidth: Int = 60
}
Expand All @@ -53,9 +53,17 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
val contigFragments: RDD[NucleotideContigFragment] = sc.loadParquet(args.inputPath, projection = Some(proj))

log.info("Merging fragments and writing FASTA to disk.")
contigFragments
val contigs = contigFragments
.mergeFragments()
.coalesce(args.partitions, args.shuffle)
.saveAsFasta(args.outputPath, args.lineWidth)
val cc = if (args.coalesce > 0) {
if (args.coalesce > contigs.partitions.size || args.forceShuffle) {
contigs.coalesce(args.coalesce, shuffle = true)
} else {
contigs.coalesce(args.coalesce, shuffle = false)
}
} else {
contigs
}
cc.saveAsFasta(args.outputPath, args.lineWidth)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var repartition: Int = -1
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1
@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false
@Args4jOption(required = false, name = "-sort_fastq_output", usage = "Sets whether to sort the FASTQ output, if saving as FASTQ. False by default. Ignored if not saving as FASTQ.")
var sortFastqOutput: Boolean = false
@Args4jOption(required = false, name = "-force_load_bam", usage = "Forces Transform to load from BAM/SAM.")
Expand Down Expand Up @@ -155,7 +157,11 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
if (args.coalesce > adamRecords.partitions.size || args.forceShuffle) {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = true)
} else {
adamRecords = adamRecords.coalesce(args.coalesce, shuffle = false)
}
}

// NOTE: For now, sorting needs to be the last transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Vcf2ADAMArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
var coalesce: Int = -1

@Args4jOption(required = false, name = "-force_shuffle_coalesce", usage = "Even if the repartitioned RDD has fewer partitions, force a shuffle.")
var forceShuffle: Boolean = false

@Args4jOption(required = false, name = "-onlyvariants", usage = "Output Variant objects instead of Genotypes")
var onlyvariants: Boolean = false
}
Expand All @@ -63,7 +66,11 @@ class Vcf2ADAM(val args: Vcf2ADAMArgs) extends BDGSparkCommand[Vcf2ADAMArgs] wit

var adamVariants: RDD[VariantContext] = sc.loadVcf(args.vcfPath, sd = dictionary)
if (args.coalesce > 0) {
adamVariants = adamVariants.coalesce(args.coalesce, true)
if (args.coalesce > adamVariants.partitions.size || args.forceShuffle) {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = true)
} else {
adamVariants = adamVariants.coalesce(args.coalesce, shuffle = false)
}
}

if (args.onlyvariants) {
Expand Down