Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SortedGenomicRDD trait, refactor shuffle joins and pipe #1590

Conversation

fnothaft
Copy link
Member

@fnothaft fnothaft commented Jul 7, 2017

Resolves #1583, #1584, #1519, #1558. Based on #1397.

  1. Adds a SortedGenomicRDD trait, and concrete implementations of this trait for all datatypes. This trait eliminates the need for the partition map (New model PartitionMap for Array[Option[(ReferenceRegion, ReferenceRegion)]] #1558), by tracking the underlying partitioner used to partition the sorted data and an average record size for the sorted RDD. This can be used to obtain bounds similar to the partition map, but can be computed at a lower cost. Additionally, this data can be cheaply created by peeking at the first item on each partition, which enables us to support legacy formats including FASTA, coordinate sorted SAM/BAM/CRAM, and VCF where data is sorted on disk but no partition map is available (Support sorted knowledge for legacy files #1584). When we infer the partitioner, we can determine whether the partitioner is lexicographically sorted or sorted by index by looking at the reference sequence order in the partitioner and the sequence dictionary that is attached to the RDD. Since we then use the inferred partitioner to co-partition data during a shuffle region join, we can support both lexicographically and sequence index ordered sort orders in the region join (Support ShuffleRegionJoins when an rdd is sorted by contig number #1519).
  2. Adds sortAndFlank method to GenomicRDD, which is overridden by SortedGenomicRDD with a variant that is shuffle-free. Refactors the pipe transformation to use the sortAndFlank method instead of shuffling with the ManualRegionPartitioner.

Still needs benchmarking, more unit tests in places, and more documentation.

Resolves bigdatagenomics#538. Adds support for Python APIs that use the ADAM Java API to make
the ADAMContext and RDD functions accessible natively through python.
Resolves bigdatagenomics#1018. Adds the `adam-codegen` module, which generates classes that:

1. Implement the Scala Product interface and thus can be read into a Spark SQL
Dataset.
2. Have a complete constructor that is compatible with the constructor that
Spark SQL expects to see when exporting a Dataset back to Scala.
3. And, that have methods for converting to/from the bdg-formats Avro models.

Then, we build these model classes in the `org.bdgenomics.adam.sql` package,
and use them for export from the Avro based GenomicRDDs. With a Dataset, we
can then export to a DataFrame, which enables us to expose data through
Python via RDD->Dataset->DataFrame. This is important since the Avro classes
generated by bdg-formats can't be pickled, and thus we can't do a Java RDD to
Python RDD crossing with them.
Resolves bigdatagenomics#882. Adds an R API that binds around the ADAM GenomicRDD APIs.
Supports Spark 2.x and onwards, as accessible SparkR functionality for binding
to Java libraries was added in Spark 2.0.0.
Resolves bigdatagenomics#1584, bigdatagenomics#1519, bigdatagenomics#1558. Adds a SortedGenomicRDD trait, and concrete
implementations of this trait for all datatypes. This trait eliminates the need
for the partition map (bigdatagenomics#1558), by tracking the underlying partitioner used to
partition the sorted data and an average record size for the sorted RDD.
This can be used to obtain bounds similar to the partition map, but can be
computed at a lower cost. Additionally, this data can be cheaply created by
peeking at the first item on each partition, which enables us to support legacy
formats including FASTA, coordinate sorted SAM/BAM/CRAM, and VCF where data is
sorted on disk but no partition map is available (bigdatagenomics#1584). When we infer the
partitioner, we can determine whether the partitioner is lexicographically
sorted or sorted by index by looking at the reference sequence order in the
partitioner and the sequence dictionary that is attached to the RDD. Since we
then use the inferred partitioner to co-partition data during a shuffle region
join, we can support both lexicographically and sequence index ordered sort
orders in the region join (bigdatagenomics#1519).
…ledge.

Resolves bigdatagenomics#1583. Adds `sortAndFlank` method to GenomicRDD, which is overridden
by `SortedGenomicRDD` with a variant that is shuffle-free. Refactors the `pipe`
transformation to use the `sortAndFlank` method instead of shuffling with the
`ManualRegionPartitioner`.
@coveralls
Copy link

coveralls commented Jul 7, 2017

Coverage Status

Coverage decreased (-1.2%) to 82.104% when pulling f03ba95 on fnothaft:issues/1583-1584-sorted-legacy-formats-and-pipe into 8572fb7 on bigdatagenomics:master.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2176/

Build result: FAILURE

[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1590/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains f76a988 # timeout=10Checking out Revision f76a988 (origin/pr/1590/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f f76a988e38b8e4ca606793dee61d78f2b99f32c3First time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.10,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.10,2.0.0,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,2.0.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.10,2.0.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'
Test FAILed.

@fnothaft
Copy link
Member Author

fnothaft commented Jul 7, 2017

Test failure is due to a mismatch between Jenkins and the changes to jenkins-test.sh; specifically, #1397 changes the Spark 2 integration tests to look for Spark=2.1.0, while Jenkins is setting 2.0.0.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made a few comments, but I haven't been able to review it all.

I am curious about the motivation for creating all the new classes/traits. Why do we need SortedGenomicRDD trait?

LexicographicalGenomicRangePartitioner(rightBounds)) => {
leftBounds.zip(rightBounds).forall(p => {
p._1.referenceName == p._2.referenceName &&
p._1.start == p._2.start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't solve the problem of determining that the data is copartitioned properly, correct? We are doing overlap joins, so the starts being equivalent doesn't actually tell us whether all data that should be joined together is colocated together. Please let me know if I've missed something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd need to look back at the code to confirm 100%, but from memory, the way that this code should work is to use the estimated longest record size from the right RDD (I believe I call this "flank margin") to move records from partition n + 1 to partition n if they are within flank_margin bases from the start of partition n + 1. This is sufficient to satisfy the copartitioning requirement we have.

I'd be glad to put together a formal writeup of this, if you'd like.


copy(rdd = newRdd, optPartitionMap = newPartitionMap)
def partitionersAreCompatible(leftRdd: SortedGenomicRDD[_, _],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea, but I'm not sure it's possible to make it simple enough to be worth computing. Please see my comment below regarding implementation.


protected val partitioner: Partitioner

override final def prepareForShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea for resolving skew in both sides by sampling from both RDDs and partitioning accordingly. It would be much more accurate and balanced than the previous implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should be able to just pick a RangePartitioner that samples from both RDDs, no?

@@ -806,24 +819,23 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging {
* None, the number of partitions on the resulting RDD does not change.
* @return a case class containing all the prepared data for ShuffleRegionJoins
*/
private def prepareForShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
protected def prepareForShuffleRegionJoin[X, Y <: GenomicRDD[X, Y]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below in SortedGenomicRDD regarding balance of both RDDs.

@fnothaft
Copy link
Member Author

fnothaft commented Jul 7, 2017

I am curious about the motivation for creating all the new classes/traits. Why do we need SortedGenomicRDD trait?

Adding a sorted trait was the original thing we'd tried doing, no? IIRC, we went down that road but then we ran into problems with mix-ins.

That said, I think the sorted trait is a natural fit because we can leverage sorted knowledge in more places than just the shuffle joins and the pipe API. E.g., in this PR, I added some simple functionality to the variant RDDs where we maintain sort order going from VariantContextRDD->VariantRDD,GenotypeRDD and VariantRDD->VariantContextRDD by overriding methods in the Sorted{Genotype,Variant,VariantContext}RDD traits. We'd want to extend this pattern anywhere that we have an isSorted parameter today.

@heuermh
Copy link
Member

heuermh commented Sep 27, 2019

@fnothaft Might be a herculean task, but could you attempt to rebase this? If not possible, I'll try to pull out only the relevant changes in a new pull request.

@heuermh
Copy link
Member

heuermh commented Jan 6, 2020

Closing as WontFix. While I would love to have these changes in, I am not able to push these changes forward. Feel free to create a new PR after rebasing against git head.

@heuermh heuermh closed this Jan 6, 2020
@heuermh heuermh added this to the 0.31.0 milestone Jan 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Integrate sorted knowledge with pipe API
5 participants