Skip to content

Commit

Permalink
MLTransform (apache#26795)
Browse files Browse the repository at this point in the history
* Initial work on MLTransform and ProcessHandler

* Support for containers: List, Dict[str, np.ndarray]
pass types
Support Pyarrow schema
Artifact WIP

* Add  min, max, artifacts for scale_0_to_1

* Add more transform functions and artifacts
WIP on inferring types
Remove pyarrow implementation
Add MLTransformOutput
Refactor files

* Add generic type annotations

* Add unit tests
Fix artifacts code
Add more tests
fix lint erors
Change namespaces from ml_transform to transforms
Add doc strings
Add tests and refactor

* Add support for saving intermediate results for a transform
Sort imports
Add metrics namespaces
Refactor

* Add schema to the output PCollection

* Remove MLTransformOutput and return Row instead with schema

* Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec

Make VarLenFeatureSpec as default
Refactoring

* Add append_transform to the ProcessHandler
Some more refactoring

* Remove param self.has_artifacts, add artifact_location to handler..and address PR comments
Add skip conditions for tests
Add test suite for tft tests

* Move tensorflow import into the try except catch
Try except in __init__.py
Remove imports from __init__
Add docstrings, refactor

* Add type annotations for the data transforms

* Add tft test in tox.ini

Mock tensorflow_transform in pydocs
fix tft pypi name

Skip a test
Add step name
Update  supported versions of TFT

* Add step name for TFTProcessHandler

* Remove unsupported tft versions

* Fix mypy

* Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema

* Update doc for data processing transforms

* Fix checking the typing container types

* Refactor code

* Fail TFTProcessHandler on a non-global window PColl

* Remove underscore

* Remove high level functions

* Add TFIDF

* Fix tests with new changes[WIP]

* Fix tests

* Refactor class name to CamelCase and remove kwrags

* use is_default instead of isinstance

* Remove falling back to staging location for artifact location

* Add TFIDF tests

* Remove __str__

* Refactor skip statement

* Add utils for fetching artifacts on compute and apply vocab

* Make ProcessHandler internal class

* Only run analyze stage when transform_fn(artifacts) is not computed before.

* Fail if pipeline has non default window during artifact producing stage

* Add support for Dict, recordbatch and introduce artifact_mode

* Hide process_handler from user. Make TFTProcessHandler as default

* Refactor few tests

* Comment a test

* Save raw_data_meta_data so that it can be used during consume stage

* Refactor code

* Add test on artifacts

* Fix imports

* Add tensorflow_metadata to pydocs

* Fix test

* Add TFIDF to import

* Add basic example

* Remove redundant logging statements

* Add test for multiple columns on MLTransform

* Add todo about what to do when new process handler is introduced

* Add abstractmethod decorator

* Edit Error message

* Update docs, error messages

* Remove record batch input/output arg

* Modify generic types

* Fix import sort

* Fix mypy errors - best effort

* Fix tests

* Add TFTOperation doc

* Rename tft_transform  to tft

* Fix hadler_test

* Fix base_test

* Fix pydocs
  • Loading branch information
AnandInguva authored and cushon committed May 24, 2024
1 parent a55cbfb commit ca86df1
Show file tree
Hide file tree
Showing 12 changed files with 2,212 additions and 1 deletion.
118 changes: 118 additions & 0 deletions sdks/python/apache_beam/examples/ml_transform/ml_transform_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This example demonstrates how to use MLTransform.
MLTransform is a PTransform that applies multiple data transformations on the
incoming data.
This example computes the vocabulary on the incoming data. Then, it computes
the TF-IDF of the incoming data using the vocabulary computed in the previous
step.
1. ComputeAndApplyVocabulary computes the vocabulary on the incoming data and
overrides the incoming data with the vocabulary indices.
2. TFIDF computes the TF-IDF of the incoming data using the vocabulary and
provides vocab_index and tf-idf weights. vocab_index is suffixed with
'_vocab_index' and tf-idf weights are suffixed with '_tfidf' to the
original column name(which is the output of ComputeAndApplyVocabulary).
MLTransform produces artifacts, for example: ComputeAndApplyVocabulary produces
a text file that contains vocabulary which is saved in `artifact_location`.
ComputeAndApplyVocabulary outputs vocab indices associated with the saved vocab
file. This mode of MLTransform is called artifact `produce` mode.
This will be useful when the data is preprocessed before ML model training.
The second mode of MLTransform is artifact `consume` mode. In this mode, the
transformations are applied on the incoming data using the artifacts produced
by the previous run of MLTransform. This mode will be useful when the data is
preprocessed before ML model inference.
"""

import argparse
import logging
import tempfile

import apache_beam as beam
from apache_beam.ml.transforms.base import ArtifactMode
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import TFIDF
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
from apache_beam.ml.transforms.utils import ArtifactsFetcher


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--artifact_location', type=str, default='')
return parser.parse_known_args()


def run(args):
data = [
dict(x=["Let's", "go", "to", "the", "park"]),
dict(x=["I", "enjoy", "going", "to", "the", "park"]),
dict(x=["I", "enjoy", "reading", "books"]),
dict(x=["Beam", "can", "be", "fun"]),
dict(x=["The", "weather", "is", "really", "nice", "today"]),
dict(x=["I", "love", "to", "go", "to", "the", "park"]),
dict(x=["I", "love", "to", "read", "books"]),
dict(x=["I", "love", "to", "program"]),
]

with beam.Pipeline() as p:
input_data = p | beam.Create(data)

# arfifacts produce mode.
input_data |= (
'MLTransform' >> MLTransform(
artifact_location=args.artifact_location,
artifact_mode=ArtifactMode.PRODUCE,
).with_transform(ComputeAndApplyVocabulary(
columns=['x'])).with_transform(TFIDF(columns=['x'])))

# _ = input_data | beam.Map(logging.info)

with beam.Pipeline() as p:
input_data = [
dict(x=['I', 'love', 'books']), dict(x=['I', 'love', 'Apache', 'Beam'])
]
input_data = p | beam.Create(input_data)

# artifacts consume mode.
input_data |= (
MLTransform(
artifact_location=args.artifact_location,
artifact_mode=ArtifactMode.CONSUME,
# you don't need to specify transforms as they are already saved in
# in the artifacts.
))

_ = input_data | beam.Map(logging.info)

# To fetch the artifacts after the pipeline is run
artifacts_fetcher = ArtifactsFetcher(artifact_location=args.artifact_location)
vocab_list = artifacts_fetcher.get_vocab_list()
assert vocab_list[22] == 'Beam'


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
args, pipeline_args = parse_args()
# for this example, create a temp artifact location if not provided.
if args.artifact_location == '':
args.artifact_location = tempfile.mkdtemp()
run(args)
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/ml/transforms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
165 changes: 165 additions & 0 deletions sdks/python/apache_beam/ml/transforms/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pytype: skip-file

import abc
from typing import Generic
from typing import List
from typing import Optional
from typing import Sequence
from typing import TypeVar

import apache_beam as beam

__all__ = ['MLTransform', 'ProcessHandler', 'BaseOperation']

TransformedDatasetT = TypeVar('TransformedDatasetT')
TransformedMetadataT = TypeVar('TransformedMetadataT')

# Input/Output types to the MLTransform.
ExampleT = TypeVar('ExampleT')
MLTransformOutputT = TypeVar('MLTransformOutputT')

# Input to the apply() method of BaseOperation.
OperationInputT = TypeVar('OperationInputT')
# Output of the apply() method of BaseOperation.
OperationOutputT = TypeVar('OperationOutputT')


class ArtifactMode(object):
PRODUCE = 'produce'
CONSUME = 'consume'


class BaseOperation(Generic[OperationInputT, OperationOutputT], abc.ABC):
def __init__(self, columns: List[str]) -> None:
"""
Base Opertation class data processing transformations.
Args:
columns: List of column names to apply the transformation.
"""
self.columns = columns

@abc.abstractmethod
def apply(
self, data: OperationInputT, output_column_name: str) -> OperationOutputT:
"""
Define any processing logic in the apply() method.
processing logics are applied on inputs and returns a transformed
output.
Args:
inputs: input data.
"""


class ProcessHandler(Generic[ExampleT, MLTransformOutputT], abc.ABC):
"""
Only for internal use. No backwards compatibility guarantees.
"""
@abc.abstractmethod
def process_data(
self, pcoll: beam.PCollection[ExampleT]
) -> beam.PCollection[MLTransformOutputT]:
"""
Logic to process the data. This will be the entrypoint in
beam.MLTransform to process incoming data.
"""

@abc.abstractmethod
def append_transform(self, transform: BaseOperation):
"""
Append transforms to the ProcessHandler.
"""


class MLTransform(beam.PTransform[beam.PCollection[ExampleT],
beam.PCollection[MLTransformOutputT]],
Generic[ExampleT, MLTransformOutputT]):
def __init__(
self,
*,
artifact_location: str,
artifact_mode: str = ArtifactMode.PRODUCE,
transforms: Optional[Sequence[BaseOperation]] = None):
"""
Args:
artifact_location: A storage location for artifacts resulting from
MLTransform. These artifacts include transformations applied to
the dataset and generated values like min, max from ScaleTo01,
and mean, var from ScaleToZScore. Artifacts are produced and stored
in this location when the `artifact_mode` is set to 'produce'.
Conversely, when `artifact_mode` is set to 'consume', artifacts are
retrieved from this location. Note that when consuming artifacts,
it is not necessary to pass the transforms since they are inherently
stored within the artifacts themselves. The value assigned to
`artifact_location` should be a valid storage path where the artifacts
can be written to or read from.
transforms: A list of transforms to apply to the data. All the transforms
are applied in the order they are specified. The input of the
i-th transform is the output of the (i-1)-th transform. Multi-input
transforms are not supported yet.
artifact_mode: Whether to produce or consume artifacts. If set to
'consume', the handler will assume that the artifacts are already
computed and stored in the artifact_location. Pass the same artifact
location that was passed during produce phase to ensure that the
right artifacts are read. If set to 'produce', the handler
will compute the artifacts and store them in the artifact_location.
The artifacts will be read from this location during the consume phase.
There is no need to pass the transforms in this case since they are
already embedded in the stored artifacts.
"""
# avoid circular import
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.ml.transforms.handlers import TFTProcessHandler
# TODO: When new ProcessHandlers(eg: JaxProcessHandler) are introduced,
# create a mapping between transforms and ProcessHandler since
# ProcessHandler is not exposed to the user.
process_handler: ProcessHandler = TFTProcessHandler(
artifact_location=artifact_location,
artifact_mode=artifact_mode,
transforms=transforms) # type: ignore[arg-type]

self._process_handler = process_handler

def expand(
self, pcoll: beam.PCollection[ExampleT]
) -> beam.PCollection[MLTransformOutputT]:
"""
This is the entrypoint for the MLTransform. This method will
invoke the process_data() method of the ProcessHandler instance
to process the incoming data.
process_data takes in a PCollection and applies the PTransforms
necessary to process the data and returns a PCollection of
transformed data.
Args:
pcoll: A PCollection of ExampleT type.
Returns:
A PCollection of MLTransformOutputT type.
"""
return self._process_handler.process_data(pcoll)

def with_transform(self, transform: BaseOperation):
"""
Add a transform to the MLTransform pipeline.
Args:
transform: A BaseOperation instance.
Returns:
A MLTransform instance.
"""
self._process_handler.append_transform(transform)
return self
Loading

0 comments on commit ca86df1

Please sign in to comment.