Skip to content

Conversation

@angoenka
Copy link
Contributor

Java Sdk Harness used pCollections to keep track of computed consumers here. This is incorrect as consumers are based on pTransforms so pTransforms should be used to keep track of computed consumers.

In case of Flatten, this creates an issue where pTransforms having same input as that to flatten are not executed. This causes

public void testFlattenMultiplePCollectionsHavingMultipleConsumers() {
to fail.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

@angoenka angoenka force-pushed the fix_java_sdk_process_bundle branch from 171b1e0 to bd55ae5 Compare July 30, 2018 03:16
@angoenka
Copy link
Contributor Author

Run Java PreCommit

@angoenka angoenka force-pushed the fix_java_sdk_process_bundle branch from bd55ae5 to 7959444 Compare July 30, 2018 23:48
@angoenka angoenka force-pushed the fix_java_sdk_process_bundle branch from 7959444 to 8d96644 Compare August 1, 2018 20:36
@angoenka
Copy link
Contributor Author

angoenka commented Aug 1, 2018

R: @bsidhom @ryan-williams @tweise

@ryan-williams
Copy link
Contributor

I'm not familiar with this part of the code, but the change makes sense based on your explanation in the OSS Runners chat.

Slightly obligatory: is there a test that could be added to verify the new/correct behavior?

Copy link
Contributor

@bsidhom bsidhom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but just echoing Ryan's question: is it possible to validate this does what we want? Or is this already addressed by failing ValidatesRunner tests that fail before but succeed after this change?

processBundleDescriptor.getPcollectionsMap(),
processBundleDescriptor.getCodersMap(),
processBundleDescriptor.getWindowingStrategiesMap(),
pCollectionIdsToConsumers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How/when was pCollectionIdsToConsumers modified previously? I would like to know whether this is the right place to update processedPTransformIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pCollectionIdsToConsumers is consumed at the time of bundle execution to lookup consumer for the pCollection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant when were elements added?

@angoenka
Copy link
Contributor Author

angoenka commented Aug 2, 2018

There are no unit tests specific to ProcessBundleHandler.java
Most of the functionality is tested using VR tests at the moment.
We should add it but I don't want to do it in this PR if adding unit tests is not trivial.
I can file a jira to track it.

@lukecwik lukecwik self-requested a review August 6, 2018 18:21
@lukecwik
Copy link
Member

lukecwik commented Aug 6, 2018

This change looks good to me. Thanks for figuring out the flaw in the existing logic since it assumed consumers were only added to the current PCollection that was being iterated over.

Ben, IMO the validates runner test is a better was to make sure that end to end this isn't broken across runners.

I looked at the Python SDK and it builds the transforms by using the topological height of the transforms. We could do something very similar and rely on QueryablePipeline#getTopologicallyOrderedTransforms instead of the current recursive descent logic. This would be a good follow up change if your interested.

@lukecwik
Copy link
Member

lukecwik commented Aug 6, 2018

Run Java PreCommit

@angoenka
Copy link
Contributor Author

angoenka commented Aug 6, 2018

Yes, Topological sort is a better way to tackle this issue in more logical manner. Current implementation is very close to what topological sort would do but as we already have means to do topological sort, we should use QueryablePipeline#getTopologicallyOrderedTransforms
Jira to track this https://issues.apache.org/jira/browse/BEAM-5090

@lukecwik
Copy link
Member

lukecwik commented Aug 6, 2018

Run Java PreCommit

1 similar comment
@lukecwik
Copy link
Member

lukecwik commented Aug 6, 2018

Run Java PreCommit

Copy link
Contributor

@bsidhom bsidhom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants