Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes and refactoring methods in SparkUtils #4765

Merged
merged 3 commits into from
May 18, 2018
Merged

Conversation

lbergelson
Copy link
Member

  • fix partitioning bug by moving edge fixing from coordinateSortReads -> querynameSortReads
  • refactor methods to reduce code duplication
  • renaming and moving some methods
  • disallow duplicate sort order on spark because it doesn't work with headerless reads

Copy link
Collaborator

@jamesemery jamesemery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of relatively minor comments and one thing to change about the sorting API that was gross

@@ -116,8 +111,17 @@ public ReadsSparkSource(final JavaSparkContext ctx, final ValidationStringency v
return (GATKRead) SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam);
}
return null;
}).filter(v1 -> v1 != null);
return putPairsInSamePartition(header, reads, ctx);
}).filter(Objects::nonNull);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change? We probably don't want to support sorting RDDs with null objects but it feels like there could well be cases involving custom partitioners with empty partitions that come up where they might happen innocuously, this will throw an exception where before we were only filtering the null objects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same filter condition as before, just referrencing Objects::nonNull instead of writing out the filter expression?

case queryname:
case unsorted: return header.getSortOrder().getComparatorInstance();
default: return null; //NOTE: javac warns if you have this (useless) default BUT it errors out if you remove this default.
}
}

/**
* do a total sort of an RDD so that all the elements in partition i are less than those in partition i+1 according to the given comparator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implies that it is only grouping by partition, not doing a global sort of everything, including elements within each partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improved comments

String firstName = current.peek().getName();
// Make sure we don't remove reads from the first partition
if (!firstGroupInBam.equals(firstName)) {
// skip the first read name group in the _current_ partition if it is the second in a pair since it will be handled in the previous partition
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove "if it is the second in pair"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Ensure all reads with the same name appear in the same partition.
* Requires that the No shuffle is needed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment makes no sense, update it to reflect that the reads must be grouped by readname and that this must be reflected in the header you provide

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Turn into key-value pairs so we can sort (by key). Values are null so there is no overhead in the amount
// of data going through the shuffle.
final JavaPairRDD<GATKRead, Void> rddReadPairs = reads.mapToPair(read -> new Tuple2<>(read, (Void) null));
public static JavaRDD<GATKRead> querynameSortReads(final JavaRDD<GATKRead> reads, SAMFileHeader header, final int numReducers) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm.... I'm not sure what to say about this class... On the one hand you have sortSamRecordsToMatchHeader() which sorts based on the header you provide it, but then we have querynameSortReads() and coordinateSortReads() which exist as seperate methods if you have GATKreads. Perhaps we should make sortGATKReadsToMatchHeader() and use a similar switch statement to denote what we haven't implemented and have that method farm out to querynameSortReads() or coordinateSortReads() to be more consistent? At the very least sortGATKReadsToMatchHeader() should exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the individual sorts and replaced with sortReadsAccordingToHeader

Utils.validateArg(ReadUtils.isReadNameGroupedBam(header), () -> "Reads must be queryname grouped or sorted. " +
"Actual sort:" + header.getSortOrder() + " Actual grouping:" +header.getGroupOrder());
int numPartitions = reads.getNumPartitions();
final String firstGroupInBam = reads.first().getName();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to firstNameInBam

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

int numPartitions = reads.getNumPartitions();
final String firstGroupInBam = reads.first().getName();
// Find the first group in each partition
List<List<GATKRead>> firstReadNamesInEachPartition = reads
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to firstReadNameGroupInEachPartition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* assert that the iterator is sorted according to the comparator
*/
public static <T> void assertSorted(Iterator<T> iterator, Comparator<T> comparator){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good utility to add

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

@Test
public void testSortCoordinateSort() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to reflect that it is asserting equivalence to htsjdk coordinate sort (or add comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done for both

}

@Test
public void testSortQuerynameSort() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@jamesemery jamesemery assigned lbergelson and unassigned jamesemery May 14, 2018
* fix partitioning bug by moving edge fixing from coordinateSortReads -> querynameSortReads
* refactor methods to reduce code duplication
* renaming and moving some methods
* disallow duplicate sort order because it doesn't work with headerless reads
@lbergelson lbergelson force-pushed the lb_sparkutils_fix branch from e311302 to 16e6179 Compare May 17, 2018 20:42
@lbergelson
Copy link
Member Author

@jamesemery I think I've responded to everything. Please take another look when you get a chance.

@lbergelson lbergelson assigned jamesemery and unassigned lbergelson May 17, 2018
@lbergelson
Copy link
Member Author

Also, don't be afraid to use the "request changes" box when publishing a review. This was definitely a request changes situation.

@codecov-io
Copy link

codecov-io commented May 17, 2018

Codecov Report

Merging #4765 into master will increase coverage by 0.062%.
The diff coverage is 88.889%.

@@               Coverage Diff               @@
##              master     #4765       +/-   ##
===============================================
+ Coverage     80.094%   80.156%   +0.062%     
- Complexity     17401     17497       +96     
===============================================
  Files           1080      1082        +2     
  Lines          63078     63275      +197     
  Branches       10176     10208       +32     
===============================================
+ Hits           50522     50719      +197     
+ Misses          8570      8568        -2     
- Partials        3986      3988        +2
Impacted Files Coverage Δ Complexity Δ
...stitute/hellbender/tools/HaplotypeCallerSpark.java 83.158% <100%> (ø) 25 <0> (ø) ⬇️
...transforms/markduplicates/MarkDuplicatesSpark.java 97.297% <100%> (+1.379%) 31 <0> (+15) ⬆️
...nder/tools/spark/pipelines/ReadsPipelineSpark.java 89.13% <100%> (ø) 12 <0> (ø) ⬇️
...ender/engine/spark/datasources/ReadsSparkSink.java 77.027% <70%> (-0.671%) 33 <6> (+4)
...der/engine/spark/datasources/ReadsSparkSource.java 79.545% <83.333%> (-2.506%) 31 <2> (-13)
...broadinstitute/hellbender/utils/test/BaseTest.java 65.972% <90.909%> (+2.062%) 37 <4> (+4) ⬆️
...adinstitute/hellbender/utils/spark/SparkUtils.java 84.524% <92.5%> (+11.797%) 21 <15> (+9) ⬆️
...ead/markduplicates/sparkrecords/EmptyFragment.java 86.207% <0%> (-2.028%) 16% <0%> (+9%)
...ils/read/markduplicates/sparkrecords/Fragment.java 100% <0%> (ø) 20% <0%> (+11%) ⬆️
...titute/hellbender/engine/TwoPassVariantWalker.java 95% <0%> (ø) 4% <0%> (?)
... and 6 more

Copy link
Collaborator

@jamesemery jamesemery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, feel free to merge 👍

* @param numReducers the number of reducers to use; a value of 0 means use the default number of reducers
* @return a sorted RDD of reads
*/
public static JavaRDD<SAMRecord> sortSamRecordsToMatchHeader(final JavaRDD<SAMRecord> reads, final SAMFileHeader header, final int numReducers) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to worry about the repartitioning for SamRecords here? It might be just a hair too much especially because we would have to rewrite our partitioner somewhere... hmm...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not meant to be a public method...

@lbergelson lbergelson merged commit 54c96b7 into master May 18, 2018
@lbergelson lbergelson deleted the lb_sparkutils_fix branch May 18, 2018 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants