Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Python Bounded Source Reader DoFn #13154

Merged
merged 9 commits into from
Nov 3, 2020

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Oct 20, 2020

This is valuable for BigQuery repeatedly firing side input. This PR is intended to be used here: #13170

This makes the SDF Bounded Source reader available to use. A small change in functionality:

  • If no source is provided to the initial restriction in the constructor, then the element is expected to be a source, and it's added to the initial restriction at creation.

r: @boyuanzz


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@pabloem
Copy link
Member Author

pabloem commented Oct 20, 2020

Run Python 3.8 PostCommit

@pabloem pabloem closed this Oct 23, 2020
@pabloem pabloem reopened this Oct 23, 2020
@pabloem
Copy link
Member Author

pabloem commented Oct 24, 2020

Run Python 3.8 PostCommit

@pabloem
Copy link
Member Author

pabloem commented Oct 24, 2020

Run PythonDocker PreCommit

Copy link
Contributor

@boyuanzz boyuanzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?

@@ -1618,3 +1628,48 @@ def display_data(self):
'source': DisplayDataItem(self.source.__class__, label='Read Source'),
'source_dd': self.source
}


class SDFBoundedSourceReader(PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the major difference between SDFBoundedSourceWrapper and SDFBoundedSourceReader is that SDFBoundedSourceWrapper takes the source as construction param where SDFBoundedSourceReader takes the source as input element. We could change the implementation of SDFBoundedSourceWrapper as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this - but I've still allowed the source to come in via the constructor as well as as an input. The intention of doing this is to keep the display data for simple Read transforms where the source is known at construction time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought we still keep _SDFBoundedSourceWrapper . Thanks for the clarification!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether it would be better for SDFBoundedSourceReader to take data_to_display as constructor instead of source directly if any. What do you think?

@pabloem
Copy link
Member Author

pabloem commented Oct 28, 2020

I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?

In this case, we will have a simple DoFn that starts the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.

@pabloem
Copy link
Member Author

pabloem commented Oct 28, 2020

Run Python 3.8 PostCommit

@@ -1618,3 +1628,48 @@ def display_data(self):
'source': DisplayDataItem(self.source.__class__, label='Read Source'),
'source_dd': self.source
}


class SDFBoundedSourceReader(PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought we still keep _SDFBoundedSourceWrapper . Thanks for the clarification!

initializes restriction based on input element that is expected to be of
BoundedSource type.
"""
def __init__(self, source: BoundedSource = None, desired_chunk_size=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remote source here?

@@ -1618,3 +1628,48 @@ def display_data(self):
'source': DisplayDataItem(self.source.__class__, label='Read Source'),
'source_dd': self.source
}


class SDFBoundedSourceReader(PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether it would be better for SDFBoundedSourceReader to take data_to_display as constructor instead of source directly if any. What do you think?

Copy link
Contributor

@boyuanzz boyuanzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Changes look good to me except some minor comments.

"""
A `RestrictionProvider` that is used by SDF for `BoundedSource`.

If source is provided, uses it for initializing restriction. Otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we also need to update pydoc here as well.

self._desired_chunk_size = desired_chunk_size

def _check_source(self, src):
if src is not None and not isinstance(src, BoundedSource):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The src cannot be None, right?

@boyuanzz
Copy link
Contributor

I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?

In this case, we will have a simple DoFn that starts the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.

I see. It seems like you will use SDFBoundedSourceReader in your BQ readAll transform. I think it would be nice to not build anything new directly on top of BoundedSource since overall we want deprecate BoundedSource in the feature. The bounded sdf wrapper is for helping us to do the migration smoothly. What do you think?

@pabloem
Copy link
Member Author

pabloem commented Oct 29, 2020

Run Python 3.8 PostCommit

@pabloem
Copy link
Member Author

pabloem commented Oct 29, 2020

I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?

In this case, we will have a simple DoFn that starts the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.

I see. It seems like you will use SDFBoundedSourceReader in your BQ readAll transform. I think it would be nice to not build anything new directly on top of BoundedSource since overall we want deprecate BoundedSource in the feature. The bounded sdf wrapper is for helping us to do the migration smoothly. What do you think?

I think that's reasonable. If any improvements are made to ReadAllFromBQ, we can make sure that they are done without relying on BoundedSource then.

@pabloem
Copy link
Member Author

pabloem commented Oct 29, 2020

Run Portable_Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Nov 3, 2020

Run Python 3.8 PostCommit

@pabloem pabloem merged commit c3cf904 into apache:master Nov 3, 2020
@pabloem pabloem deleted the bsreadersdf branch November 3, 2020 23:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants