Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 154 additions & 11 deletions sdks/python/apache_beam/yaml/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,51 @@
# Examples Catalog

<!-- TOC -->

* [Examples Catalog](#examples-catalog)
* [Wordcount](#wordcount)
* [Transforms](#transforms)
* [Aggregation](#aggregation)
* [Blueprints](#blueprints)
* [Element-wise](#element-wise)
* [IO](#io)
* [ML](#ml)
* [Wordcount](#wordcount)
* [Transforms](#transforms)
* [Aggregation](#aggregation)
* [Blueprints](#blueprints)
* [Element-wise](#element-wise)
* [IO](#io)
* [ML](#ml)

<!-- TOC -->

## Prerequistes

Build this jar for running with the run command in the next stage:

```
cd <path_to_beam_repo>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
```

## Example Run

This module contains a series of Beam YAML code samples that can be run using
the command:

```
python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml
```

Depending on the yaml pipeline, the output may be emitted to standard output or
a file located in the execution folder used.

## Wordcount

A good starting place is the [Wordcount](wordcount_minimal.yaml) example under
the root example directory.
This example reads in a text file, splits the text on each word, groups by each
word, and counts the occurrence of each word. This is a classic example used in
the other SDK's and shows off many of the functionalities of Beam YAML.

## Testing

A test file is located in the testing folder that will execute all the example
yamls and confirm the expected results.

```
pytest -v testing/

Expand All @@ -71,25 +79,160 @@ Examples in this directory show off the various built-in transforms of the Beam
YAML framework.

### Aggregation

These examples leverage the built-in `Combine` transform for performing simple
aggregations including sum, mean, count, etc.

### Blueprints

These examples leverage DF or other existing templates and convert them to yaml
blueprints.

### Element-wise

These examples leverage the built-in mapping transforms including `MapToFields`,
`Filter` and `Explode`. More information can be found about mapping transforms
[here](https://beam.apache.org/documentation/sdks/yaml-udf/).

### IO
These examples leverage the built-in `Spanner_Read` and `Spanner_Write`
transform for performing simple reads and writes from a spanner DB.

#### Spanner

Examples [Spanner Read](transforms/io/spanner_read.yaml) and [Spanner Write](
transforms/io/spanner_write.yaml) leverage the built-in `Spanner_Read` and
`Spanner_Write` transforms for performing simple reads and writes from a
Google Spanner database.

#### Kafka

Examples involving Kafka such as [Kafka Read Write](transforms/io/kafka.yaml)
require users to set up a Kafka cluster that Dataflow runner executing the
Beam pipeline has access to.
Please note that `ReadFromKafka` transform has
a [known issue](https://github.com/apache/beam/issues/22809) when
using non-Dataflow portable runners where reading may get stuck in streaming
pipelines. Hence using the Dataflow runner is recommended for examples that
involve reading from Kafka in a streaming pipeline.

See [here](https://kafka.apache.org/quickstart) for general instructions on
setting up a Kafka cluster. An option is to use [Click to Deploy](
https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
to quickly launch a Kafka cluster on [GCE](
https://cloud.google.com/products/compute?hl=en). [SASL/PLAIN](
https://kafka.apache.org/documentation/#security_sasl_plain) authentication
mechanism is configured for the brokers as part of the deployment. See
also [here](
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataflow/flex-templates/kafka_to_bigquery)
for an alternative step-by-step guide on setting up Kafka on GCE without the
authentication mechanism.

Let's assume one of the bootstrap servers is on VM instance `kafka-vm-0`
with the internal IP address `123.45.67.89` and port `9092` that the bootstrap
server is listening on. SASL/PLAIN `USERNAME` and `PASSWORD` can be viewed from
the VM instance's metadata on the GCE console, or with gcloud CLI:

```sh
gcloud compute instances describe kafka-vm-0 \
--format='value[](metadata.items.kafka-user)'
gcloud compute instances describe kafka-vm-0 \
--format='value[](metadata.items.kafka-password)'
```

Beam pipeline [Kafka Read Write](transforms/io/kafka.yaml) first writes data to
the Kafka topic using the `WriteToKafka` transform and then reads that data back
using the `ReadFromKafka` transform. Run the pipeline:

```sh
export PROJECT="$(gcloud config get-value project)"
export TEMP_LOCATION="gs://MY-BUCKET/tmp"
export REGION="us-central1"
export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

python -m apache_beam.yaml.main \
--yaml_pipeline_file transforms/io/kafka.yaml \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $PROJECT \
--region $REGION \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
--jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
"TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
```

**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT authentication
configured for the brokers, there's no SASL/PLAIN `USERNAME` and `PASSWORD`
needed. In the pipelines, omit the configurations `producer_config_updates` and
`consumer_config` from the `WriteToKafka` and `ReadFromKafka` transforms.
Run the commands above without specifying the username and password in
`--jinja_variables` flag.

#### Iceberg

Beam pipelines [Iceberg Write](transforms/io/iceberg_write.yaml) and
[Iceberg Read](transforms/io/iceberg_read.yaml) are examples of how to interact
with Iceberg tables on GCS storage and with Hadoop catalog configured.

To create a GCS bucket as our warehouse storage,
see [here](https://cloud.google.com/storage/docs/creating-buckets#command-line).
To run the pipelines locally, an option is to create a service account key in
order to access GCS (see
[here](https://cloud.google.com/iam/docs/keys-create-delete#creating)).
Within the pipelines, specify GCS bucket name and the path to the saved service
account key .json file.

**_Note_**: With Hadoop catalog, Iceberg will use Hadoop connector for GCS.
See [here](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
for full list of configuration options for Hadoop catalog when use with GCS.

To create and write to Iceberg tables on GCS, run:

```sh
python -m apache_beam.yaml.main \
--yaml_pipeline_file transforms/io/iceberg_write.yaml
```

The pipeline uses [Dynamic destinations](
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
write to dynamically create and select a table destination based on field
values in the incoming records.

To read from a created Iceberg table on GCS, run:

```sh
python -m apache_beam.yaml.main \
--yaml_pipeline_file transforms/io/iceberg_read.yaml
```

**_Optional_**: To run the pipeline on Dataflow, service account key is
[not needed](
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md).
Omit the authentication settings in the Hadoop catalog configuration `
config_properties`, and run:

```sh
export REGION="us-central1"
export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`"

gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file transforms/io/iceberg_write.yaml \
--region $REGION
```

```sh
export REGION="us-central1"
export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`"

gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file transforms/io/iceberg_read.yaml \
--region $REGION
```

### ML

These examples leverage the built-in `Enrichment` transform for performing
ML enrichments.

More information can be found about aggregation transforms
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
95 changes: 89 additions & 6 deletions sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_enrichment(

This PTransform simulates the behavior of the Enrichment transform by
looking up data from predefined in-memory tables based on the provided
`enrichment_handler` and `handler_config`.
`enrichment_handler` and `handler_config`.

Note: The Github action that invokes these tests does not have gcp
dependencies installed which is a prerequisite to
Expand Down Expand Up @@ -111,10 +111,25 @@ def _fn(row):
return pcoll | beam.Map(_fn)


@beam.ptransform.ptransform_fn
def test_kafka_read(
pcoll,
format,
topic,
bootstrap_servers,
auto_offset_reset_config,
consumer_config):
return (
pcoll | beam.Create(input_data.text_data().split('\n'))
| beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))


TEST_PROVIDERS = {
'TestEnrichment': test_enrichment,
'TestEnrichment': test_enrichment, 'TestReadFromKafka': test_kafka_read
}

INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka']


def check_output(expected: List[str]):
"""
Expand Down Expand Up @@ -184,7 +199,11 @@ def test_yaml_example(self):
actual = [
yaml_transform.expand_pipeline(
p,
pipeline_spec, [yaml_provider.InlineProvider(TEST_PROVIDERS)])
pipeline_spec,
[
yaml_provider.InlineProvider(
TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
])
]
if not actual[0]:
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
Expand Down Expand Up @@ -373,9 +392,30 @@ def _wordcount_test_preprocessor(
env.input_file('kinglear.txt', '\n'.join(lines)))


@YamlExamplesTestSuite.register_test_preprocessor('test_kafka_yaml')
def _kafka_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):

test_spec = replace_recursive(
test_spec,
'ReadFromText',
'path',
env.input_file('kinglear.txt', input_data.text_data()))

if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromKafka':
transform['type'] = 'TestReadFromKafka'

return test_spec


@YamlExamplesTestSuite.register_test_preprocessor([
'test_simple_filter_yaml',
'test_simple_filter_and_combine_yaml',
'test_iceberg_read_yaml',
'test_iceberg_write_yaml',
'test_kafka_yaml',
'test_spanner_read_yaml',
'test_spanner_write_yaml',
'test_enrich_spanner_with_bigquery_yaml'
Expand Down Expand Up @@ -417,9 +457,10 @@ def _io_write_test_preprocessor(
def _file_io_read_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
This preprocessor replaces any ReadFrom transform with a Create transform
that reads from a predefined in-memory dictionary. This allows the test
to verify the pipeline's correctness without relying on external files.
This preprocessor replaces any file IO ReadFrom transform with a Create
transform that reads from a predefined in-memory dictionary. This allows
the test to verify the pipeline's correctness without relying on external
files.

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
Expand All @@ -445,6 +486,47 @@ def _file_io_read_test_preprocessor(
return test_spec


@YamlExamplesTestSuite.register_test_preprocessor(['test_iceberg_read_yaml'])
def _iceberg_io_read_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for tests that involve reading from Iceberg.

This preprocessor replaces any ReadFromIceberg transform with a Create
transform that reads from a predefined in-memory dictionary. This allows
the test to verify the pipeline's correctness without relying on Iceberg
tables stored externally.

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
expected: A list of strings representing the expected output of the
pipeline.
env: The TestEnvironment object providing utilities for creating temporary
files.

Returns:
The modified test_spec dictionary with ReadFromIceberg transforms replaced.
"""
if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromIceberg':
config = transform['config']
(db_name, table_name,
field_value_dynamic_destinations) = config['table'].split('.')

transform['type'] = 'Create'
transform['config'] = {
k: v
for k, v in config.items() if k.startswith('__')
}
transform['config']['elements'] = INPUT_TABLES[(
str(db_name),
str(table_name),
str(field_value_dynamic_destinations))]

return test_spec


@YamlExamplesTestSuite.register_test_preprocessor(
['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml'])
def _spanner_io_read_test_preprocessor(
Expand Down Expand Up @@ -531,6 +613,7 @@ def _enrichment_test_preprocessor(
spanner_shipments_data(),
('orders-test', 'order-database', 'orders'): input_data.
spanner_orders_data(),
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
bigtable_data(),
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data()
Expand Down
Loading
Loading