-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADAM-651] Hive-style partitioning of parquet files by genomic position #1911
Conversation
removed addChrPrefix parameter Address PR comments - part 1 Address PR comments - part 2 fix nits Rebased Address review comments - part 3 address reviewer comments - white space and redundant asserts fixed isPartitioned fixed codacy issue with return in isPartitioned made writePartitionedParquetFlag private updated overlap calc in refereceRegionsToDatasetQueryStirng try2 add filter in genomicRDD move filterOverlapping to alignment record another try trying as filterDatasetByOverlappingRegions move filter mapq out of AlignmentRecord clean up, ready to retest with Mango feb 12th morning added GenotypeRDD filterByOverlappingRegion support dataset region filter in all types updated ADAMContext to use filterByOverlap and added docs fix doc indent by mvn removed public referenceRegionQueryStrin function form ADAMContext
2 similar comments
Please review @heuermh, @akmorrow13 and others, and let me know changed needed. todo: Have not yet resolved the documented ambiguous behavior if only some directories in a glob are partitioned. I suggest we leave with the documented ambiguous behavior if merge is possible for v24 this weeks and update later. |
Test PASSed. |
@@ -26,7 +26,7 @@ import java.nio.file.Paths | |||
import org.apache.hadoop.fs.Path | |||
import org.apache.hadoop.io.LongWritable | |||
import org.apache.parquet.hadoop.metadata.CompressionCodecName | |||
import org.apache.spark.SparkContext | |||
import org.apache.spark.{ SparkContext } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove brackets
Test PASSed. |
Test PASSed. |
Test PASSed. |
My recent commits here are an effort to better deal with the parameter Prior to the previous commit, user would need to keep track of what partition size they used when they wrote a dataset, if they used other than the default of 1 mb. The tracking issue is solved now by writing the partition size as an integer to the flag file The next goal I have is for user at read time to not need to retrieve or supply the partitionBinSize when working with the dataset, because this is already set for the dataset at write time and the dataset should know its own partitionBinSize and use it. For example, The best way I see to deal with this is to have My concern though is that this a specific implementation detail for partitioned parquet backed datasets, and we may not want it adding noise to the Let me know if you see a better way. pinging @fnothaft and all for this and general review |
Test FAILed. Build result: FAILURE[...truncated 7 lines...] > /home/jenkins/git2/bin/git init /home/jenkins/workspace/ADAM-prb # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git --version # timeout=10 > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/heads/:refs/remotes/origin/ # timeout=15 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10 > /home/jenkins/git2/bin/git config --add remote.origin.fetch +refs/heads/:refs/remotes/origin/ # timeout=10 > /home/jenkins/git2/bin/git config remote.origin.url https://github.com/bigdatagenomics/adam.git # timeout=10Fetching upstream changes from https://github.com/bigdatagenomics/adam.git > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1911/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 026b51f # timeout=10Checking out Revision 026b51f (origin/pr/1911/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 026b51f > /home/jenkins/git2/bin/git rev-list d654134 # timeout=10Triggering ADAM-prb ? 2.6.2,2.10,2.2.1,centosTriggering ADAM-prb ? 2.6.2,2.11,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.10,2.2.1,centosTriggering ADAM-prb ? 2.7.3,2.11,2.2.1,centosADAM-prb ? 2.6.2,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.6.2,2.11,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.10,2.2.1,centos completed with result FAILUREADAM-prb ? 2.7.3,2.11,2.2.1,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
5c29dde passes all tests for me locally, any ideas why it fails above? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly stylistic nits, otherwise generally LGTM! Thanks @jpdna!
@@ -131,6 +131,8 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu | |||
var storageLevel: String = "MEMORY_ONLY" | |||
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.") | |||
var disableProcessingStep = false | |||
@Args4jOption(required = false, name = "-save_as_dataset", usage = "EXPERIMENTAL: Save as a Parquet format Spark-SQL dataset") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The doc on this is a bit misleading. The partitioning is enabled by Spark SQL, but is not really Spark SQL specific. I'd say that providing this flag saves the data partitioned by genomic locus using Hive-style partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, "Spark SQL" vs "Spark-SQL".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved - and changed parameter to an integer so user can specify bin size.
outputRdd.save(args, | ||
isSorted = args.sortReads || args.sortLexicographically) | ||
if (args.saveAsDataset) { | ||
outputRdd.saveAsPartitionedParquet(args.outputPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit irrelevant to this line, but OOC, how does saveAsPartitionedParquet
handle data that isn't aligned? It seems like we may want to warn if there isn't a sequence dictionary attached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will save all data to partition bin number 0, and doing so will not result in any benefit.
A warning stating that has been added with log.warn
val reads = loadParquetAlignments(pathName) | ||
|
||
val datasetBoundAlignmentRecordRDD = if (regions.nonEmpty) { | ||
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps, Some(partitionedBinSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Long line, should be broken up.
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps, Some(partitionedBinSize)) | ||
.filterByOverlappingRegions(regions) | ||
} else { | ||
DatasetBoundAlignmentRecordRDD(reads.dataset, reads.sequences, reads.recordGroups, reads.processingSteps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Long line, should be broken up.
val genotypes = loadParquetGenotypes(pathName) | ||
|
||
val datasetBoundGenotypeRDD = if (regions.nonEmpty) { | ||
DatasetBoundGenotypeRDD(genotypes.dataset, genotypes.sequences, genotypes.samples, genotypes.headerLines, Some(partitionedBinSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Long line, should be broken up.
* If a glob is used, all directories within the blog must be partitioned, and must have been saved | ||
* using the same partitioned bin size. Behavior is undefined if this requirement is not met. | ||
*/ | ||
def getPartitionedBinSize(pathName: String): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should be private, or at least private[rdd]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, but had to add a separate public isPartitioned
function as because Mango needs to be able to check to see if data is partitioned.
"\' and (end > " + r.start + " and start < " + r.end + "))") | ||
.mkString(" or ") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra whitespace.
* ReferenceRegion, defaults to 1 | ||
* @return Returns a new DatasetBoundGenomicRDD with ReferenceRegions filter applied. | ||
*/ | ||
def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd looove this to move into the new DatasetBoundGenomicDataset
trait which came in @akmorrow13's 6051321, and then this would override filterByOverlappingRegions
. However, this has a slightly different signature due to the addition of optPartitionedLookBackNum
. I propose that we either:
- Move this into
DatasetBoundGenomicDataset
and make this method protected, and then overridefilterByOverlappingRegions(querys)
to callfilterDatasetByOverlappingRegions(querys)
. - Or, keep this as is now but open a ticket to close up this API in 0.25.0.
Thoughts?
Unrelated nit: querys
should be queries
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively - upon thinking about it, I don't like this optPartitionedLookbacknum
as a parameter to the filterByOverlap function anyhow, as this is a config parameter tied to the dataset just like partitionedBinSize
that should only need be set once when the dataset is created, so we could add it as another optional parameter to the DatasetBoundtype constructor.
If we did that, optPartitionedLookbacknum
, the parameter could go away making overriding filterByOverlappingRegions
override in DatasetBoudGenomicDataset
work cleanly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM!
@@ -2531,6 +2546,23 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: Product, V <: A | |||
saveSequences(filePath) | |||
} | |||
|
|||
protected def referenceRegionsToDatasetQueryString(regions: Iterable[ReferenceRegion], partitionSize: Int = 1000000, partitionedLookBackNum: Int = 1): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that a lot of downstream classes that implement GenomicDataset
wind up overriding filterDatasetByOverlappingRegions
with transformDataset(ds => ds.filter(referenceRegionsToDatasetQueryString(...))
. I get why they have to do this, and I think fixing it is out of scope for this PR. However, can you open a ticket for us to move that code into a trait in a future release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In update the filterDatasetByOverlappingRegion
is implemented generically in trait DatasetBoundGenomicDataset
, thus no more overrideing
override def filterDatasetByOverlappingRegions(querys: Iterable[ReferenceRegion], | ||
optPartitionedLookBackNum: Option[Int] = Some(1)): FragmentRDD = { | ||
transformDataset((d: Dataset[org.bdgenomics.adam.sql.Fragment]) => | ||
d.filter(referenceRegionsToDatasetQueryString(querys, partitionedBinSize.get, optPartitionedLookBackNum.get))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work for fragment, no? Fragment
doesn't have contigName
, start
, or end
fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, removed Fragment.
Replaced by #1922 |
Fixes #651.
Uses genomic range binning to write partitioned parquet files, readable by Spark dataset API.
Significantly improves latency when reading filtering by genomic regions.
Replaces #1878