Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24515] Ensure that Go pipelines on Dataflow are not allowed to opt out of runner v2. #24767

Merged
merged 2 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@

## Breaking Changes

* Python streaming pipelines and portable Python batch pipelines on Dataflow are required to
* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to
use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2`
experiments will raise an error during pipeline construction. Note that non-portable Python
batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515))
experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker
jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).

## Deprecations

Expand Down
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ createCrossLanguageValidatesRunnerTask(
"--region ${dataflowRegion}",
"--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"",
"--sdk_overrides \".*java.*,${dockerJavaImageContainer}:${dockerTag}\"",
"--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}",
],
)

Expand Down
65 changes: 48 additions & 17 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
Expand Down Expand Up @@ -71,7 +72,6 @@ var (
tempLocation = flag.String("temp_location", "", "Temp location (optional)")
machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)")
minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
damccorm marked this conversation as resolved.
Show resolved Hide resolved
workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)")
workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)")
Expand Down Expand Up @@ -177,25 +177,26 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
panic("Beam has not been initialized. Call beam.Init() before pipeline construction.")
}

edges, nodes, err := p.Build()
if err != nil {
return nil, err
}
streaming := !graph.Bounded(nodes)

beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts, err := getJobOptions(ctx)
opts, err := getJobOptions(ctx, streaming)
if err != nil {
return nil, err
}

// (1) Build and submit
// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker".
id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())

modelURL := gcsx.Join(*stagingLocation, id, "model")
workerURL := gcsx.Join(*stagingLocation, id, "worker")
jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
xlangURL := gcsx.Join(*stagingLocation, id, "xlang")

edges, _, err := p.Build()
if err != nil {
return nil, err
}
artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
if err != nil {
return nil, errors.WithContext(err, "resolving cross-language artifacts")
Expand All @@ -221,25 +222,25 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
log.Info(ctx, "Dry-run: not submitting job!")

log.Info(ctx, proto.MarshalTextString(model))
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL)
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL)
if err != nil {
return nil, err
}
dataflowlib.PrintJob(ctx, job)
return nil, nil
}

return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *jobopts.Async)
return dataflowlib.Execute(ctx, model, opts, workerURL, modelURL, *endpoint, *jobopts.Async)
}

func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions, error) {
project := gcpopts.GetProjectFromFlagOrEnvironment(ctx)
if project == "" {
return nil, errors.New("no Google Cloud project specified. Use --project=<project>")
}
region := gcpopts.GetRegion(ctx)
if region == "" {
return nil, errors.New("No Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
return nil, errors.New("no Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
}
if *stagingLocation == "" {
return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
Expand Down Expand Up @@ -269,6 +270,9 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
}
}
if !streaming && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping for a batch pipeline, did you mean to construct a streaming pipeline?")
}
if !*update && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
}
Expand All @@ -282,24 +286,51 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
hooks.SerializeHooksToOptions()

experiments := jobopts.GetExperiments()
// Always use runner v2, unless set already.
var v2set, portaSubmission bool
// Ensure that we enable the same set of experiments across all SDKs
// for runner v2.
var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
for _, e := range experiments {
if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") {
if strings.Contains(e, "beam_fn_api") {
fnApiSet = true
}
if strings.Contains(e, "use_runner_v2") {
v2set = true
}
if strings.Contains(e, "use_unified_worker") {
uwSet = true
}
if strings.Contains(e, "use_portable_job_submission") {
portaSubmission = true
}
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
}
}
// Enable default experiments.
if !fnApiSet {
experiments = append(experiments, "beam_fn_api")
}
// Enable by default unified worker, and portable job submission.
if !v2set {
experiments = append(experiments, "use_runner_v2")
}
if !uwSet {
experiments = append(experiments, "use_unified_worker")
}
if !portaSubmission {
experiments = append(experiments, "use_portable_job_submission")
}

// Ensure that streaming specific experiments are set for streaming pipelines
// since runner v2 only supports using streaming engine.
if streaming {
if !seSet {
experiments = append(experiments, "enable_streaming_engine")
}
if !wsSet {
experiments = append(experiments, "enable_windmill_service")
}
}

if *minCPUPlatform != "" {
experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
}
Expand All @@ -312,6 +343,7 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts := &dataflowlib.JobOptions{
Name: jobopts.GetJobName(),
Streaming: streaming,
Experiments: experiments,
DataflowServiceOptions: dfServiceOptions,
Options: beam.PipelineOptions.Export(),
Expand All @@ -335,7 +367,6 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
TempLocation: *tempLocation,
TemplateLocation: *templateLocation,
Worker: *jobopts.WorkerBinary,
WorkerJar: *workerJar,
WorkerRegion: *workerRegion,
WorkerZone: *workerZone,
TeardownPolicy: *teardownPolicy,
Expand Down
Loading