Skip to content

Commit

Permalink
Remove support for Dataflow classic templates (#1789)
Browse files Browse the repository at this point in the history
  • Loading branch information
relud authored Aug 17, 2021
1 parent 3e0426e commit fa98ac0
Show file tree
Hide file tree
Showing 54 changed files with 365 additions and 708 deletions.
2 changes: 1 addition & 1 deletion bin/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ docker run $INTERACTIVE_FLAGS --rm \
-e GOOGLE_APPLICATION_CREDENTIALS \
$(for e in $PASS_ENV; do echo "-e $e"; done) \
$MOUNT_CREDENTIALS_FLAGS \
maven:3-jdk-8 \
maven:3-jdk-11 \
mvn -Duser.home=/var/maven "$@"
26 changes: 10 additions & 16 deletions docs/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,23 @@ This document specifies the architecture for GCP Ingestion as a whole.
matching destinations
- Must accept configuration enabling republishing of messages to a debug
topic if they contain an `x_debug_id` attribute
- Must accept a compile-time parameter enabling or disabling debug republishing
- Must accept a runtime parameter defining the destination topic
- Must accept configuration enabling or disabling debug republishing
- Must accept configuration for the destination topic
- Must accept configuration enabling republishing of a random sample of the
input stream
- Must accept a compile-time parameter setting the sample ratio
- Must accept a runtime parameter defining the destination topic
- Must accept configuration for the sample ratio
- Must accept configuration for the destination topic
- Must accept configuration mapping `document_type`s to PubSub topics
- Must accept a compile-time parameter defining a topic pattern string
(may be promoted to runtime if Dataflow adds support for PubSub topic
names defined via `NestedValueProvider`)
- Must accept a compile-time parameter defining which `document_type`s
to republish
- Must accept configuration for the destination topic pattern
- Must accept configuration for which `document_type`s to republish
- Must only deliver messages with configured destinations
- Must accept configuration mapping `document_namespace`s to PubSub topics
- Must accept a compile-time parameter defining a map from document
namespaces to topics
- Must accept configuration for a map from `document_namespace`s to topics
- Must only deliver messages with configured destinations
- Must accept optional configuration for sampling telemetry data
- Must accept a compile-time parameter defining a topic pattern string
(may be promoted to runtime if Dataflow adds support for PubSub topic
names defined via `NestedValueProvider`)
- Must accept compile-time parameters defining the sampling ratio for
each channel (nightly, beta, and release)
- Must accept configuration for the destination topic pattern
- Must accept configuration for the sampling ratio for each channel (nightly,
beta, and release)

### Live Sink

Expand Down
16 changes: 0 additions & 16 deletions docs/architecture/pain_points.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ in batch mode.

Does not support dynamic destinations.

Does not support [`NestedValueProvider`] for destinations in streaming mode on
Dataflow, which is needed to create classic templates that accept a mapping of
document type to a predetermined number of destinations. This is because
Dataflow moves the implementation into the shuffler to improve performance.
Current workaround is to specify mapping at classic template creation time, or
use Flex Templates.

Does not use standard client library.

Does not expose an output of delivered messages, which is needed for at least
Expand All @@ -53,15 +46,6 @@ Uses HTTPS JSON API, which increases message payload size vs protobuf by 25%
for base64 encoding and causes some messages to exceed the 10MB request size
limit that otherwise would not.

[`nestedvalueprovider`]: https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/options/ValueProvider.NestedValueProvider.html

## Templates

Does not support repeated parameters via `ValueProvider<List<...>>`, as
described in [Dataflow Java SDK #632].

[dataflow java sdk #632]: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/632

# PubSub

Can be prohibitively expensive. It costs
Expand Down
84 changes: 19 additions & 65 deletions docs/ingestion-beam/sink-job.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,36 +259,32 @@ gsutil cat $BUCKET/output/*

### On Dataflow with Flex Templates

The Dataflow templates documentation includes [a section explaining the benefits of flex
templates](https://cloud.google.com/dataflow/docs/concepts/dataflow-templates#evaluating-which-template-type-to-use)
The [Dataflow templates documentation](https://cloud.google.com/dataflow/docs/concepts/dataflow-templates) explains:

> Flex Templates bring more flexibility over classic templates by allowing minor variations of
> Dataflow jobs to be launched from a single template and allowing the use of any source or sink
> I/O. For classic templates, the execution graph is built during the template creation process. The
> execution graph for Flex Templates is dynamically built based on runtime parameters provided by
> the user when the template is executed. This means that when you use Flex Templates, you can make
> minor variations to accomplish different tasks with the same underlying template, such as changing
> the source or sink file formats.
> Dataflow templates allow you to stage your pipelines on Google Cloud and [run
> them](https://cloud.google.com/dataflow/docs/templates/executing-templates) using the Google Cloud
> Console, the `gcloud` command-line tool, or REST API calls. [...] Flex Templates package the
> pipeline as a Docker image and stage these images on your project's Container Registry.
```bash
# pick a project to store the docker image in
# pick the project to store the docker image in
PROJECT=$(gcloud config get-value project)"
# pick a region to run Dataflow jobs in
# pick the region to run Dataflow jobs in
PROJECT=$(gcloud config get-value compute/region)"

# pick a bucket to store files in
# pick the bucket to store files in
BUCKET="gs://$PROJECT"

# configure gcloud credential helper for docker to push to GCR
gcloud auth configure-docker

# build a docker image for a Flex Template
# build the docker image for the Flex Template
export IMAGE=gcr.io/$PROJECT/ingestion-beam/sink:latest
docker-compose build --build-arg FLEX_TEMPLATE_JAVA_MAIN_CLASS=com.mozilla.telemetry.Sink
docker-compose push

# create a Flex Template
# create the Flex Template
gcloud dataflow flex-template build \
$BUCKET/sink/flex-templates/latest.json \
--image $IMAGE \
Expand Down Expand Up @@ -322,63 +318,21 @@ gcloud dataflow jobs show "$JOB_ID" --region=$REGION
gsutil cat $BUCKET/output/*
```

### On Dataflow with classic templates (deprecated)

Dataflow classic templates make a distinction between
[runtime parameters that implement the `ValueProvider` interface](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#runtime-parameters-and-the-valueprovider-interface)
and compile-time parameters which do not.
All options can be specified at classic template compile time by passing command line flags,
but runtime parameters can also be overridden when
[executing the classic template](https://cloud.google.com/dataflow/docs/guides/templates/executing-templates#using-gcloud)
via the `--parameters` flag.
In the output of `--help=SinkOptions`, runtime parameters are those
with type `ValueProvider`.

```bash
# Pick a bucket to store files in
BUCKET="gs://$(gcloud config get-value project)"

# Set credentials; beam is not able to use gcloud credentials
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/creds.json"

# create a classic template
./bin/mvn compile exec:java -Dexec.args="\
--runner=Dataflow \
--project=$(gcloud config get-value project) \
--inputFileFormat=json \
--inputType=file \
--outputFileFormat=json \
--outputType=file \
--errorOutputType=file \
--templateLocation=$BUCKET/sink/templates/JsonFileToJsonFile \
--stagingLocation=$BUCKET/sink/staging \
"

# create a test input file
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' | gsutil cp - $BUCKET/input.json

# run the dataflow classic template with gcloud
JOBNAME=FileToFile1
gcloud dataflow jobs run $JOBNAME --gcs-location=$BUCKET/sink/templates/JsonFileToJsonFile --parameters "input=$BUCKET/input.json,output=$BUCKET/output/,errorOutput=$BUCKET/error"

# get the job id
JOB_ID="$(gcloud dataflow jobs list --filter=name=$JOBNAME --format='value(JOB_ID)' --limit=1)"

# wait for the job to finish
gcloud dataflow jobs show "$JOB_ID"

# check that the message was delivered
gsutil cat $BUCKET/output/*
```

### In streaming mode

If `--inputType=pubsub`, Beam will execute in streaming mode, requiring some
extra configuration for file-based outputs. You will need to specify sharding like:

```
--outputNumShards=10
--errorOutputNumShards=10
--outputNumShards=10 \
--errorOutputNumShards=10 \
```

or for Flex Templates:

```
--parameters=outputNumShards=10 \
--parameters=errorOutputNumShards=10 \
```

As discussed in the
Expand Down
2 changes: 1 addition & 1 deletion ingestion-beam/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ COPY ingestion-beam/checkstyle /app/ingestion-sink/checkstyle
RUN mvn prepare-package
COPY ingestion-core/src /app/ingestion-core/src
COPY ingestion-beam/src /app/ingestion-beam/src
RUN mvn package -Dmaven.test.skip=true
RUN mvn package -Dmaven.test.skip=true -Dmaven.compiler.release=11

# This is the base image for the final image we're building.
# The base image documentation is here:
Expand Down
103 changes: 0 additions & 103 deletions ingestion-beam/bin/build-template

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;

Expand Down Expand Up @@ -43,7 +42,7 @@ public static PipelineResult run(String[] args) {
*/
public static PipelineResult run(RepublisherOptions.Parsed options) {
// We aren't decoding payloads, so no need to re-compress when republishing.
options.setOutputPubsubCompression(StaticValueProvider.of(Compression.UNCOMPRESSED));
options.setOutputPubsubCompression(Compression.UNCOMPRESSED);

final Pipeline pipeline = Pipeline.create(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,29 @@
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;

public interface ContextualServicesReporterOptions extends SinkOptions, PipelineOptions {

@Description("Path (local or gs://) to CSV file text file containing allowed reporting "
+ "URLs to which requests can be sent to action type (click or impression),"
+ " expected format is url,type with no header")
ValueProvider<String> getUrlAllowList();
String getUrlAllowList();

void setUrlAllowList(ValueProvider<String> value);
void setUrlAllowList(String value);

@Description("Comma-separated strings representing a list of doc types for which "
+ " to send reporting requests; doc types are not namespace qualified "
+ "(e.g. quicksuggest-click is a correct argument)")
ValueProvider<String> getAllowedDocTypes();
String getAllowedDocTypes();

void setAllowedDocTypes(ValueProvider<String> value);
void setAllowedDocTypes(String value);

@Description("If set to true, send HTTP requests to reporting URLs. "
+ "Can be set to false for testing purposes.")
@Default.Boolean(true)
ValueProvider<Boolean> getReportingEnabled();
Boolean getReportingEnabled();

void setReportingEnabled(ValueProvider<Boolean> value);
void setReportingEnabled(Boolean value);

@Description("Duration window for aggregation of reporting requests.")
@Default.String("10m")
Expand All @@ -51,9 +50,9 @@ public interface ContextualServicesReporterOptions extends SinkOptions, Pipeline
@Description("If set to true, send successfully requested reporting URLs to"
+ " error output. SendRequests stage does not continue if true.")
@Default.Boolean(true)
ValueProvider<Boolean> getLogReportingUrls();
Boolean getLogReportingUrls();

void setLogReportingUrls(ValueProvider<Boolean> value);
void setLogReportingUrls(Boolean value);

@Hidden
interface Parsed extends ContextualServicesReporterOptions, SinkOptions.Parsed {
Expand Down
Loading

0 comments on commit fa98ac0

Please sign in to comment.