Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sideinputs to the RunInference Transform #25200

Merged
merged 54 commits into from
Feb 2, 2023

Conversation

AnandInguva
Copy link
Contributor

@AnandInguva AnandInguva commented Jan 27, 2023

Part 1 of #24042

  • RunInference transform will accept side input emitting ModelMetadata.
  • The Side input must emit one value at a time. It should be compatible with AsSingleton view
  • All the metrics are tagged with unique id so that it would be easy to differentiate the metrics calculated by different models.
  • Add a RunInference pipeline visitor to check if the side inputs are enabled only during --streaming mode and to. check if the side input has windows other than GlobalWindows with non default trigger.

This method follows slowly changing side input patterns as described in https://beam.apache.org/documentation/patterns/side-inputs/

TODO:
Part 2: WatchFileTransform and it's use case. It will come in the following PRs.
Part 3: DLQ for error outputs.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

fix up parentheses
Add time.sleep
Update PredictionResult
Update Pcollection type hint
Add model_path param in the ModelHandler
Update model path in pytorch_inference.py
Add model path to sklearn model
Add model id to sklearn inference PredictionResult
clean up base.py
Update utils
update base.py
Update base.py
Add default value
Add prefix/human readable id to ModelMetadata
Fix snippets tests
revert build settings
Fix lint
Add model_id to PredictionResult
@AnandInguva
Copy link
Contributor Author

Run Python 3.8 PostCommit

@AnandInguva
Copy link
Contributor Author

R : @damccorm

@codecov
Copy link

codecov bot commented Jan 27, 2023

Codecov Report

Merging #25200 (637d3c3) into master (01aa470) will decrease coverage by 0.02%.
The diff coverage is 57.89%.

@@            Coverage Diff             @@
##           master   #25200      +/-   ##
==========================================
- Coverage   72.96%   72.95%   -0.02%     
==========================================
  Files         743      745       +2     
  Lines       99037    99174     +137     
==========================================
+ Hits        72267    72348      +81     
- Misses      25404    25460      +56     
  Partials     1366     1366              
Flag Coverage Δ
python 82.45% <57.89%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...thon/apache_beam/ml/inference/pytorch_inference.py 0.00% <0.00%> (ø)
...hon/apache_beam/ml/inference/tensorrt_inference.py 0.00% <0.00%> (ø)
...am/examples/inference/run_inference_side_inputs.py 35.00% <35.00%> (ø)
sdks/python/apache_beam/ml/inference/base.py 96.12% <98.07%> (+0.35%) ⬆️
...thon/apache_beam/ml/inference/sklearn_inference.py 96.25% <100.00%> (-0.05%) ⬇️
sdks/python/apache_beam/ml/inference/utils.py 100.00% <100.00%> (ø)
...eam/runners/portability/fn_api_runner/execution.py 92.49% <0.00%> (-0.64%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.54% <0.00%> (+0.12%) ⬆️
...on/apache_beam/runners/dataflow/dataflow_runner.py 81.88% <0.00%> (+0.14%) ⬆️
...python/apache_beam/runners/worker/worker_status.py 76.66% <0.00%> (+1.33%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@damccorm
Copy link
Contributor

damccorm commented Jan 31, 2023

Precommit issues should be fixed by #25234

@AnandInguva
Copy link
Contributor Author

Run PythonDocker PreCommit

@AnandInguva
Copy link
Contributor Author

Run Portable_Python PreCommit

@AnandInguva
Copy link
Contributor Author

Run Python Dataflow V2 ValidatesRunner

@AnandInguva
Copy link
Contributor Author

AnandInguva commented Jan 31, 2023

@damccorm I added the test to be in the validates DataflowRunner suite since I see that it is the test I see where all the side streaming tests are running.

PTAL

if (self._run_inference_contains_side_input and
not self._options.view_as(StandardOptions).streaming):
raise RuntimeError(
"SideInputs to RunInference PTransform is only supported "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this side input, the target use cases would reading from unbounded sources or using PeriodicImpulse(which doesn't work as expected in batch mode).

So by default, the model update side input is only restricted for streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, because batch mode can completely process precondition stages before processing dependent stages, the meaning of pipeline updates breaks down (what does updating to the latest version of the model mean when all inferences are done at the "same time"), and if you do have a streaming source (which our built ins will be), the pipeline can hang forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In batch, the "latest" version of the model is likely to be the only version of the model, but that doesn't mean it can't be computed lazily. This also avoids having to have two separate codepaths (known at construction time can be "upgraded" to Create + Side Input).

If you do have a streaming source, the pipeline should automatically run in streaming mode. (Also, IMHO, if just because a mode is more useful in one mode than the other doesn't mean we should prohibit it in the other unless it's actually detrimental.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do have a streaming source, the pipeline should automatically run in streaming mode.

AFAIK, this isn't actually true in Python today.

if just because a mode is more useful in one mode than the other doesn't mean we should prohibit it in the other unless it's actually detrimental.

I think its detrimental in the batch use case because understanding the behavior requires understanding runner internals (in this case, how Dataflow/other runners handle stages), and the behavior is very confusing if you don't understand those internals. This is antithetical to one of the purposes of RunInference: taking a hard beam thing and making it easy.

I want RunInference users to be able to think about Beam as little as possible when building their pipelines, even if it comes at the cost of some of the purity of the unified model.


One possible reconciliation here would be to emit a warning in batch mode with an informative error message, or allow users to intentionally enable running in batch mode with a parameter (though neither of those seem like awesome experiences, and I'd still personally favor explicitly disallowing this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do have a streaming source, the pipeline should automatically run in streaming mode.

AFAIK, this isn't actually true in Python today.

If that's the case, we should definitely fix this :).

if just because a mode is more useful in one mode than the other doesn't mean we should prohibit it in the other unless it's actually detrimental.

I think its detrimental in the batch use case because understanding the behavior requires understanding runner internals (in this case, how Dataflow/other runners handle stages), and the behavior is very confusing if you don't understand those internals. This is antithetical to one of the purposes of RunInference: taking a hard beam thing and making it easy.

Could you clarify a bit more why understanding runner internals is required for understanding what happens in the batch case (or, in other words, what confusing behavior the user would run into)? I'm not proposing we do away with the "easy" mode when the model is known at compile time (for batch or streaming), rather that we allow its computation to be deferred in both modes if this is explicitly what the user asks for.

I want RunInference users to be able to think about Beam as little as possible when building their pipelines, even if it comes at the cost of some of the purity of the unified model.

Yes, for sure.

One possible reconciliation here would be to emit a warning in batch mode with an informative error message, or allow users to intentionally enable running in batch mode with a parameter (though neither of those seem like awesome experiences, and I'd still personally favor explicitly disallowing this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do have a streaming source, the pipeline should automatically run in streaming mode.

AFAIK, this isn't actually true in Python today.

If that's the case, we should definitely fix this :).

I tend to agree, but IIRC there were concerns about doing this (@tvalentyn thoughts?). It may have just been a matter of us not getting around to doing it yet. I would at least be worried about our existing IOs behaving correctly if we were to change this behavior, I'm not sure how crisp we've been here and I know of at least one (PeriodicImpulse/Sequence) that wasn't correctly marking itself unbounded. That's all solvable though, and probably isn't made worse by the change regardless.

Could you clarify a bit more why understanding runner internals is required for understanding what happens in the batch case (or, in other words, what confusing behavior the user would run into)? I'm not proposing we do away with the "easy" mode when the model is known at compile time (for batch or streaming), rather that we allow its computation to be deferred in both modes if this is explicitly what the user asks for.

If the following things happened:

  1. Pipeline starts with model A
  2. 10,000 records arrive for inference + I do some preprocessing
  3. Model updates to model B
  4. 10,000 records arrive
  5. Model updates to model C
  6. 1 more records arrives

If I'm in batch mode, all 20,001 records get processed by model C (I actually think this depends on the runner, correct me if I'm wrong). If I'm in streaming mode, model A and model B each process 10,000 records, model C processes 1 record. The streaming case is by far the more useful one, and almost definitely the use case that most people are after with this kind of feature, but the discrepancy here is odd if you don't know how the model works.

With all that said, I can see cases where the batch use case would be nice (e.g. it allows daily pipelines without updating your model manually) - I think the framing of "deferred computation" vs "live model updates" helps there, and I could get behind allowing this. Though, the idea of a single unchanged transform for batch and streaming pipelines still seems fairly unrealistic to me because the boundedness of the side input source would need to be changed to match the boundedness of the main source in most cases to be useful.

My biggest sticking point is still that AFAIK unbounded sources don't automatically put pipelines in streaming mode, which leads to unexplained pipeline hangs in batch mode for this use case. Whether we change that or not, my vote would be to not block this PR on that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do have a streaming source, the pipeline should automatically run in streaming mode.

AFAIK, this isn't actually true in Python today.

If that's the case, we should definitely fix this :).

I tend to agree, but IIRC there were concerns about doing this (@tvalentyn thoughts?). It may have just been a matter of us not getting around to doing it yet. I would at least be worried about our existing IOs behaving correctly if we were to change this behavior, I'm not sure how crisp we've been here and I know of at least one (PeriodicImpulse/Sequence) that wasn't correctly marking itself unbounded. That's all solvable though, and probably isn't made worse by the change regardless.

Filed #25264 ; we can continue discussion there (including if there's any downsides to this change, though I don't see any). If there is undesirable behavior, like hanging, when unbounded sources are used, this is entirely orthogonal to RunInferrence.

Could you clarify a bit more why understanding runner internals is required for understanding what happens in the batch case (or, in other words, what confusing behavior the user would run into)? I'm not proposing we do away with the "easy" mode when the model is known at compile time (for batch or streaming), rather that we allow its computation to be deferred in both modes if this is explicitly what the user asks for.

If the following things happened:

  1. Pipeline starts with model A
  2. 10,000 records arrive for inference + I do some preprocessing
  3. Model updates to model B
  4. 10,000 records arrive
  5. Model updates to model C
  6. 1 more records arrives

If I'm in batch mode, all 20,001 records get processed by model C (I actually think this depends on the runner, correct me if I'm wrong). If I'm in streaming mode, model A and model B each process 10,000 records, model C processes 1 record. The streaming case is by far the more useful one, and almost definitely the use case that most people are after with this kind of feature, but the discrepancy here is odd if you don't know how the model works.

One can't have this scenario for batch. In batch, one has 20,001 records and three models. None of the models (or records) are "first" and if anything there is ambiguity which model should be used. (Actually, with AsSingleton, an error will generally be raised if there's more than one.)

Now if the model (and records) are appropriately timestamped (this works in batch), they'll all get the "right" model. Bonus: this makes testing a lot easier.

With all that said, I can see cases where the batch use case would be nice (e.g. it allows daily pipelines without updating your model manually) - I think the framing of "deferred computation" vs "live model updates" helps there, and I could get behind allowing this. Though, the idea of a single unchanged transform for batch and streaming pipelines still seems fairly unrealistic to me because the boundedness of the side input source would need to be changed to match the boundedness of the main source in most cases to be useful.

A bounded side input with an unbounded main input is easy to understand, and the other way around can work too (especially if one is using timestamped data) though a bit odd. But we shouldn't fall into the trap that RunInferrence is always going to be at the top level (i.e. at the same level of the Reads)--it may be used in an even higher-level composite for example.

My biggest sticking point is still that AFAIK unbounded sources don't automatically put pipelines in streaming mode, which leads to unexplained pipeline hangs in batch mode for this use case. Whether we change that or not, my vote would be to not block this PR on that change.

If using an unbounded side input causes hangs, yes, let's warn on that (ideally as high a level as makes sense, it's not a RunInference-specific issue) until this is fixed. But I'd rather not prohibit side inputs altogether.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #25264 ;

Thanks

None of the models (or records) are "first" and if anything there is ambiguity which model should be used.

I meant first by timestamp. For the model at least (if not the main input), I think this is a reasonable understanding of how the non-Beam person will see the world.

Now if the model (and records) are appropriately timestamped (this works in batch), they'll all get the "right" model.

I didn't think this was the case, and it alleviates my concerns here.

If using an unbounded side input causes hangs, yes, let's warn on that (ideally as high a level as makes sense, it's not a RunInference-specific issue) until this is fixed. But I'd rather not prohibit side inputs altogether.

I'm fine with this if Anand is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an offline conversation with Danny with this and we both tend to agree on Robert's view.

I imposed the restriction because it would be easier to restrictive at first and then introduce new features(SI for batches) based on the feedback but now I think it is important to follow the beam unified model concept and remove the restriction on the side input.

Thanks

sdks/python/apache_beam/pipeline.py Outdated Show resolved Hide resolved

_ = inference_pcoll | "Logging" >> beam.Map(logging.info)

test_pipeline.run().wait_until_finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The with Pipeline() as pipeline syntax is generally preferred to a manual .run().wait_until_finish() call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it. thanks

@AnandInguva
Copy link
Contributor Author

@damccorm Added a couple of tests and removed the restrictions.

PTAL.

@AnandInguva
Copy link
Contributor Author

Run Python Dataflow V2 ValidatesRunner

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thanks! (needed to resolve some conflicts in CHANGES.md, but I just did that - I'll merge when checks complete again)

CHANGES.md Outdated
@@ -69,6 +69,7 @@
present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java versions). This is accompanied
by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11.
* Add UDF metrics support for Samza portable mode.
* RunInference PTransform will accept Singleton SideInputs in Python SDK. ([#24042](https://github.com/apache/beam/issues/24042))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, actually this won't be released in 2.45 - could you please create a new section for 2.46? (same thing Luke did here - 8ec0568#diff-d975bf659606195d2165918f93e1cf680ef68ea3c9cab994f033705fea8238b2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks for catching it.

CHANGES.md Outdated Show resolved Hide resolved
@damccorm
Copy link
Contributor

damccorm commented Feb 2, 2023

I'll merge once checks pass

@damccorm
Copy link
Contributor

damccorm commented Feb 2, 2023

Remaining checks all passed, they just didn't status for some reason

@damccorm damccorm merged commit 10805a2 into apache:master Feb 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants