From 5e7ca3210fdb95ccff152b60db19b1beba187d34 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 26 Jan 2023 23:48:27 -0500 Subject: [PATCH] Add documentation, changelog --- CHANGES.md | 2 ++ sdks/python/apache_beam/ml/inference/base.py | 13 +++++++------ .../documentation/sdks/python-machine-learning.md | 11 ++++++++++- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 18b7ed989fb6..2f03b8025c5d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,8 @@ * Adding override of allowed TLS algorithms (Java), now maintaining the disabled/legacy algorithms present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java versions). This is accompanied by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11. +* RunInference PTransform will accept Singleton SideInputs in Python SDK. ([#24042](https://github.com/apache/beam/issues/24042)) + ## Breaking Changes diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 1150b39921fd..aa19ebcf9e38 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -310,7 +310,7 @@ def __init__( inference_args: Optional[Dict[str, Any]] = None, metrics_namespace: Optional[str] = None, *, - model_path_pcoll: beam.PCollection[ModelMetdata] = None): + model_metadata_pcoll: beam.PCollection[ModelMetdata] = None): """A transform that takes a PCollection of examples (or features) for use on an ML model. The transform then outputs inferences (or predictions) for those examples in a PCollection of PredictionResults that contains the input @@ -328,14 +328,15 @@ def __init__( inference_args: Extra arguments for models whose inference call requires extra parameters. metrics_namespace: Namespace of the transform to collect metrics. - model_path_pcoll: PCollection that emits model path - that is used as a side input to the _RunInferenceDoFn. + model_metadata_pcoll: PCollection that emits Singleton ModelMetadata + containing model path and model name, that is used as a side input + to the _RunInferenceDoFn. """ self._model_handler = model_handler self._inference_args = inference_args self._clock = clock self._metrics_namespace = metrics_namespace - self._model_path_pcoll = model_path_pcoll + self._model_metadata_pcoll = model_metadata_pcoll # TODO(BEAM-14046): Add and link to help documentation. @classmethod @@ -365,7 +366,7 @@ def expand( # batching DoFn APIs. | beam.BatchElements(**self._model_handler.batch_elements_kwargs())) - enable_side_input_loading = self._model_path_pcoll is not None + enable_side_input_loading = self._model_metadata_pcoll is not None return ( batched_elements_pcoll | 'BeamML_RunInference' >> ( @@ -377,7 +378,7 @@ def expand( enable_side_input_loading), self._inference_args, beam.pvalue.AsSingleton( - self._model_path_pcoll, + self._model_metadata_pcoll, ) if enable_side_input_loading else None).with_resource_hints( **resource_hints))) diff --git a/website/www/site/content/en/documentation/sdks/python-machine-learning.md b/website/www/site/content/en/documentation/sdks/python-machine-learning.md index e24abdf7e0cc..4b58d1f33c80 100644 --- a/website/www/site/content/en/documentation/sdks/python-machine-learning.md +++ b/website/www/site/content/en/documentation/sdks/python-machine-learning.md @@ -183,7 +183,8 @@ For more information, see [`KeyedModelHander`](https://beam.apache.org/releases/ When doing a prediction in Apache Beam, the output `PCollection` includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions. -The `PredictionResult` is a `NamedTuple` object that contains both the input and the inferences, named `example` and `inference`, respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. +The `PredictionResult` is a `NamedTuple` object that contains both the input, inferences, and model_id +named `example`, `inference`, `model_id` respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. ``` class PostProcessor(beam.DoFn): @@ -217,6 +218,14 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Side Inputs to Update Models +From Beam 2.45.0, the RunInference PTransform will accept a side input of `ModelMetadata`, which is a `NamedTuple` containing the `model_id` and `model_name`. + * `model_id`: The model_id is used to load the models. It could be an URI or path to the model. + * `model_name`: Unique identifier used to append the metrics. This should be short relative to the model_id so that it can be attached to the metrics to identify which model was used to calculate the metrics. + +**Note**: If the main PCollection emits inputs and side input has yet to receive inputs, the main PCollection will get buffered until there is + an update to the side input. This could happen with Global windowed side inputs with data driven triggers such as `AfterCount`, `AfterProcessingTime`. So until there is an update to the side input, emit the default/initial model id that is used to pass the respective `ModelHandler` as side input.. + ## Beam Java SDK support The RunInference API is available with the Beam Java SDK versions 2.41.0 and later through Apache Beam's [Multi-language Pipelines framework](/documentation/programming-guide/#multi-language-pipelines). For information about the Java wrapper transform, see [RunInference.java](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java). To try it out, see the [Java Sklearn Mnist Classification example](https://github.com/apache/beam/tree/master/examples/multi-language).