diff --git a/CHANGES.md b/CHANGES.md index 310078a58067..5bbfebec6b1d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,10 +67,10 @@ ## Breaking Changes -* Python streaming pipelines and portable Python batch pipelines on Dataflow are required to +* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2` - experiments will raise an error during pipeline construction. Note that non-portable Python - batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)) + experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker + jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)). ## Deprecations diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 8a02d878885a..1df8a1737bff 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -455,7 +455,6 @@ createCrossLanguageValidatesRunnerTask( "--region ${dataflowRegion}", "--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"", "--sdk_overrides \".*java.*,${dockerJavaImageContainer}:${dockerTag}\"", - "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", ], ) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 598ec4c1aaa5..51f0eff189ce 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -35,6 +35,7 @@ import ( "cloud.google.com/go/storage" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks" @@ -71,7 +72,6 @@ var ( tempLocation = flag.String("temp_location", "", "Temp location (optional)") machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)") minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)") - workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)") workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)") workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)") dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)") @@ -177,25 +177,26 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) panic("Beam has not been initialized. Call beam.Init() before pipeline construction.") } + edges, nodes, err := p.Build() + if err != nil { + return nil, err + } + streaming := !graph.Bounded(nodes) + beam.PipelineOptions.LoadOptionsFromFlags(flagFilter) - opts, err := getJobOptions(ctx) + opts, err := getJobOptions(ctx, streaming) if err != nil { return nil, err } // (1) Build and submit - // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar". + // NOTE(herohde) 10/8/2018: the last segment of the names must be "worker". id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano()) modelURL := gcsx.Join(*stagingLocation, id, "model") workerURL := gcsx.Join(*stagingLocation, id, "worker") - jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar") xlangURL := gcsx.Join(*stagingLocation, id, "xlang") - edges, _, err := p.Build() - if err != nil { - return nil, err - } artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL) if err != nil { return nil, errors.WithContext(err, "resolving cross-language artifacts") @@ -221,7 +222,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) log.Info(ctx, "Dry-run: not submitting job!") log.Info(ctx, proto.MarshalTextString(model)) - job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL) + job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL) if err != nil { return nil, err } @@ -229,17 +230,17 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) return nil, nil } - return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *jobopts.Async) + return dataflowlib.Execute(ctx, model, opts, workerURL, modelURL, *endpoint, *jobopts.Async) } -func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { +func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions, error) { project := gcpopts.GetProjectFromFlagOrEnvironment(ctx) if project == "" { return nil, errors.New("no Google Cloud project specified. Use --project=") } region := gcpopts.GetRegion(ctx) if region == "" { - return nil, errors.New("No Google Cloud region specified. Use --region=. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints") + return nil, errors.New("no Google Cloud region specified. Use --region=. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints") } if *stagingLocation == "" { return nil, errors.New("no GCS staging location specified. Use --staging_location=gs:///") @@ -269,6 +270,9 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal) } } + if !streaming && *transformMapping != "" { + return nil, errors.New("provided transform_name_mapping for a batch pipeline, did you mean to construct a streaming pipeline?") + } if !*update && *transformMapping != "" { return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated") } @@ -282,24 +286,51 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { hooks.SerializeHooksToOptions() experiments := jobopts.GetExperiments() - // Always use runner v2, unless set already. - var v2set, portaSubmission bool + // Ensure that we enable the same set of experiments across all SDKs + // for runner v2. + var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { - if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") { + if strings.Contains(e, "beam_fn_api") { + fnApiSet = true + } + if strings.Contains(e, "use_runner_v2") { v2set = true } + if strings.Contains(e, "use_unified_worker") { + uwSet = true + } if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") + } + } + // Enable default experiments. + if !fnApiSet { + experiments = append(experiments, "beam_fn_api") } - // Enable by default unified worker, and portable job submission. if !v2set { + experiments = append(experiments, "use_runner_v2") + } + if !uwSet { experiments = append(experiments, "use_unified_worker") } if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } + // Ensure that streaming specific experiments are set for streaming pipelines + // since runner v2 only supports using streaming engine. + if streaming { + if !seSet { + experiments = append(experiments, "enable_streaming_engine") + } + if !wsSet { + experiments = append(experiments, "enable_windmill_service") + } + } + if *minCPUPlatform != "" { experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform)) } @@ -312,6 +343,7 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { beam.PipelineOptions.LoadOptionsFromFlags(flagFilter) opts := &dataflowlib.JobOptions{ Name: jobopts.GetJobName(), + Streaming: streaming, Experiments: experiments, DataflowServiceOptions: dfServiceOptions, Options: beam.PipelineOptions.Export(), @@ -335,7 +367,6 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { TempLocation: *tempLocation, TemplateLocation: *templateLocation, Worker: *jobopts.WorkerBinary, - WorkerJar: *workerJar, WorkerRegion: *workerRegion, WorkerZone: *workerZone, TeardownPolicy: *teardownPolicy, diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 737c3ec7b5b5..a8611ecb2ad0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -49,21 +49,94 @@ func TestGetJobOptions(t *testing.T) { *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" *jobopts.JobName = "testJob" - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), false) if err != nil { t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") } + if got, want := opts.Streaming, false; got != want { + t.Errorf("getJobOptions().Streaming = %t, want %t", got, want) + } + if got, want := opts.Name, "testJob"; got != want { + t.Errorf("getJobOptions().Name = %q, want %q", got, want) + } + if got, want := len(opts.Experiments), 5; got != want { + t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, want) + } else { + sort.Strings(opts.Experiments) + expectedExperiments := []string{"beam_fn_api", "min_cpu_platform=testPlatform", "use_portable_job_submission", "use_runner_v2", "use_unified_worker"} + for i := 0; i < 3; i++ { + if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { + t.Errorf("getJobOptions().Experiments[%d] = %q, want %q", i, got, want) + } + } + } + if got, want := len(opts.DataflowServiceOptions), 2; got != want { + t.Errorf("len(getJobOptions().DataflowServiceOptions) = %q, want %q", got, want) + } else { + sort.Strings(opts.DataflowServiceOptions) + expectedOptions := []string{"opt1", "opt2"} + for i := 0; i < 2; i++ { + if got, want := opts.DataflowServiceOptions[i], expectedOptions[i]; got != want { + t.Errorf("getJobOptions().DataflowServiceOptions = %q, want %q", got, want) + } + } + } + if got, want := opts.Project, "testProject"; got != want { + t.Errorf("getJobOptions().Project = %q, want %q", got, want) + } + if got, want := opts.Region, "testRegion"; got != want { + t.Errorf("getJobOptions().Region = %q, want %q", got, want) + } + if got, want := len(opts.Labels), 2; got != want { + t.Errorf("len(getJobOptions().Labels) = %q, want %q", got, want) + } else { + if got, want := opts.Labels["label1"], "val1"; got != want { + t.Errorf("getJobOptions().Labels[\"label1\"] = %q, want %q", got, want) + } + if got, want := opts.Labels["label2"], "val2"; got != want { + t.Errorf("getJobOptions().Labels[\"label2\"] = %q, want %q", got, want) + } + } + if got, want := opts.TempLocation, "gs://testStagingLocation/tmp"; got != want { + t.Errorf("getJobOptions().TempLocation = %q, want %q", got, want) + } + if got, want := opts.FlexRSGoal, "FLEXRS_SPEED_OPTIMIZED"; got != want { + t.Errorf("getJobOptions().FlexRSGoal = %q, want %q", got, want) + } +} + +func TestGetJobOptions_Streaming(t *testing.T) { + resetGlobals() + *labels = `{"label1": "val1", "label2": "val2"}` + *stagingLocation = "gs://testStagingLocation" + *minCPUPlatform = "testPlatform" + *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED" + *dataflowServiceOptions = "opt1,opt2" + + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + + *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" + *jobopts.JobName = "testJob" + + opts, err := getJobOptions(context.Background(), true) + if err != nil { + t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") + } + if got, want := opts.Streaming, true; got != want { + t.Errorf("getJobOptions().Streaming = %t, want %t", got, want) + } if got, want := opts.Name, "testJob"; got != want { t.Errorf("getJobOptions().Name = %q, want %q", got, want) } - if got, want := len(opts.Experiments), 3; got != want { + if got, want := len(opts.Experiments), 7; got != want { t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, want) } else { sort.Strings(opts.Experiments) - expectedExperiments := []string{"min_cpu_platform=testPlatform", "use_portable_job_submission", "use_runner_v2"} + expectedExperiments := []string{"beam_fn_api", "enable_streaming_engine", "enable_windmill_service", "min_cpu_platform=testPlatform", "use_portable_job_submission", "use_runner_v2", "use_unified_worker"} for i := 0; i < 3; i++ { if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { - t.Errorf("getJobOptions().Experiments = %q, want %q", got, want) + t.Errorf("getJobOptions().Experiments[%d] = %q, want %q", i, got, want) } } } @@ -109,30 +182,71 @@ func TestGetJobOptions_NoExperimentsSet(t *testing.T) { *gcpopts.Region = "testRegion" *jobopts.Experiments = "" - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), false) + + if err != nil { + t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") + } + if got, want := len(opts.Experiments), 4; got != want { + t.Fatalf("len(getJobOptions().Experiments) = %q, want %q", got, want) + } + sort.Strings(opts.Experiments) + expectedExperiments := []string{"beam_fn_api", "use_portable_job_submission", "use_unified_worker", "use_runner_v2"} + for i := 0; i < 2; i++ { + if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { + t.Errorf("getJobOptions().Experiments[%d] = %q, want %q", i, got, want) + } + } +} + +func TestGetJobOptions_NoExperimentsSetStreaming(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "" + + opts, err := getJobOptions(context.Background(), true) if err != nil { t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") } - if got, want := len(opts.Experiments), 2; got != want { + if got, want := len(opts.Experiments), 6; got != want { t.Fatalf("len(getJobOptions().Experiments) = %q, want %q", got, want) } sort.Strings(opts.Experiments) - expectedExperiments := []string{"use_portable_job_submission", "use_unified_worker"} + expectedExperiments := []string{"beam_fn_api", "enable_streaming_engine", "enable_windmill_service", "use_portable_job_submission", "use_unified_worker", "use_runner_v2"} for i := 0; i < 2; i++ { if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { - t.Errorf("getJobOptions().Experiments = %q, want %q", got, want) + t.Errorf("getJobOptions().Experiments[%d] = %q, want %q", i, got, want) } } } +func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_runner_v2" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_NoStagingLocation(t *testing.T) { resetGlobals() *stagingLocation = "" *gcpopts.Project = "testProject" *gcpopts.Region = "testRegion" - _, err := getJobOptions(context.Background()) + _, err := getJobOptions(context.Background(), false) if err == nil { t.Fatalf("getJobOptions() returned error nil, want an error") } @@ -145,7 +259,7 @@ func TestGetJobOptions_InvalidAutoscaling(t *testing.T) { *gcpopts.Project = "testProject" *gcpopts.Region = "testRegion" - _, err := getJobOptions(context.Background()) + _, err := getJobOptions(context.Background(), false) if err == nil { t.Fatalf("getJobOptions() returned error nil, want an error") } @@ -158,7 +272,7 @@ func TestGetJobOptions_InvalidRsGoal(t *testing.T) { *gcpopts.Project = "testProject" *gcpopts.Region = "testRegion" - _, err := getJobOptions(context.Background()) + _, err := getJobOptions(context.Background(), false) if err == nil { t.Fatalf("getJobOptions() returned error nil, want an error") } @@ -204,7 +318,7 @@ func TestGetJobOptions_TransformMapping(t *testing.T) { *update = true *transformMapping = `{"transformOne": "transformTwo"}` - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), true) if err != nil { t.Errorf("getJobOptions() returned error, got %v", err) } @@ -217,6 +331,23 @@ func TestGetJobOptions_TransformMapping(t *testing.T) { } +func TestGetJobOptions_TransformMappingNotStreaming(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *update = true + *transformMapping = `{"transformOne": "transformTwo"}` + + opts, err := getJobOptions(context.Background(), false) + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" @@ -224,7 +355,7 @@ func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) { *gcpopts.Region = "testRegion" *transformMapping = `{"transformOne": "transformTwo"}` - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), true) if err == nil { t.Error("getJobOptions() returned error nil, want an error") } @@ -241,7 +372,7 @@ func TestGetJobOptions_InvalidMapping(t *testing.T) { *update = true *transformMapping = "not a JSON-encoded string" - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), true) if err == nil { t.Error("getJobOptions() returned error nil, want an error") } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index 1cff2359d35a..67ed337bed57 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -35,7 +35,7 @@ import ( ) // Execute submits a pipeline as a Dataflow job. -func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, workerURL, jarURL, modelURL, endpoint string, async bool) (*dataflowPipelineResult, error) { +func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL, endpoint string, async bool) (*dataflowPipelineResult, error) { // (1) Upload Go binary to GCS. presult := &dataflowPipelineResult{} @@ -75,15 +75,6 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker return presult, err } - if opts.WorkerJar != "" { - log.Infof(ctx, "Staging Dataflow worker jar: %v", opts.WorkerJar) - - if _, err := stageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil { - return presult, err - } - log.Infof(ctx, "Staged worker jar: %v", jarURL) - } - // (2) Upload model to GCS log.Info(ctx, proto.MarshalTextString(raw)) @@ -94,7 +85,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker // (3) Translate to v1b3 and submit - job, err := Translate(ctx, raw, opts, workerURL, jarURL, modelURL) + job, err := Translate(ctx, raw, opts, workerURL, modelURL) if err != nil { return presult, err } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index c8a00b58621f..5499347a2675 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -46,6 +46,7 @@ type JobOptions struct { // Pipeline options Options runtime.RawOptions + Streaming bool Project string Region string Zone string @@ -80,8 +81,6 @@ type JobOptions struct { // Worker is the worker binary override. Worker string - // WorkerJar is a custom worker jar. - WorkerJar string // -- Internal use only. Not supported in public Dataflow. -- @@ -89,14 +88,13 @@ type JobOptions struct { } // Translate translates a pipeline to a Dataflow job. -func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, jarURL, modelURL string) (*df.Job, error) { +func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string) (*df.Job, error) { // (1) Translate pipeline to v1b3 speak. jobType := "JOB_TYPE_BATCH" apiJobType := "FNAPI_BATCH" - streaming := !pipelinex.Bounded(p) - if streaming { + if opts.Streaming { jobType = "JOB_TYPE_STREAMING" apiJobType = "FNAPI_STREAMING" } @@ -114,16 +112,6 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker Name: "worker", Location: workerURL, }} - experiments := append(opts.Experiments, "beam_fn_api") - - if opts.WorkerJar != "" { - jar := &df.Package{ - Name: "dataflow-worker.jar", - Location: jarURL, - } - packages = append(packages, jar) - experiments = append(experiments, "use_staged_dataflow_worker_jar") - } for _, url := range opts.ArtifactURLs { name := url[strings.LastIndexAny(url, "/")+1:] @@ -166,7 +154,7 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker Options: dataflowOptions{ PipelineURL: modelURL, Region: opts.Region, - Experiments: experiments, + Experiments: opts.Experiments, TempLocation: opts.TempLocation, }, GoOptions: opts.Options, @@ -193,7 +181,7 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker WorkerRegion: opts.WorkerRegion, WorkerZone: opts.WorkerZone, TempStoragePrefix: opts.TempLocation, - Experiments: experiments, + Experiments: opts.Experiments, }, Labels: opts.Labels, TransformNameMapping: opts.TransformNameMapping, @@ -217,10 +205,6 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker if opts.TeardownPolicy != "" { workerPool.TeardownPolicy = opts.TeardownPolicy } - if streaming { - // Add separate data disk for streaming jobs - workerPool.DataDisks = []*df.Disk{{}} - } return job, nil } diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index 5d34f9c72c8a..d1048b49c0bd 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -28,7 +28,6 @@ task dataflowValidatesRunner() { group = "Verification" dependsOn ":sdks:go:test:goBuild" - dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" doLast { @@ -37,7 +36,6 @@ task dataflowValidatesRunner() { ] def options = [ "--runner dataflow", - "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", "--pipeline_opts \"${pipelineOptions.join(' ')}\"", ] exec { diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index 0d0531c82347..444dc1ae39a7 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -65,9 +65,6 @@ # example in the format "us.gcr.io/". # --region -> GCP region to run Dataflow jobs on. # --gcs_location -> GCS URL for storing temporary files for Dataflow jobs. -# --dataflow_worker_jar -> The Dataflow worker jar to use when running jobs. -# If not specified, the script attempts to retrieve a previously built -# jar from the appropriate gradle module, which may not succeed. set -e trap '! [[ "$BASH_COMMAND" =~ ^(echo|read|if|ARGS|shift|SOCKET_SCRIPT|\[\[) ]] && \ @@ -162,11 +159,6 @@ case $key in shift # past argument shift # past value ;; - --dataflow_worker_jar) - DATAFLOW_WORKER_JAR="$2" - shift # past argument - shift # past value - ;; --flink_job_server_jar) FLINK_JOB_SERVER_JAR="$2" shift # past argument @@ -266,12 +258,7 @@ s.close() " # Set up environment based on runner. -if [[ "$RUNNER" == "dataflow" ]]; then - if [[ -z "$DATAFLOW_WORKER_JAR" ]]; then - DATAFLOW_WORKER_JAR=$(find $(pwd)/runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-fn-api-worker-*.jar) - fi - echo "Using Dataflow worker jar: $DATAFLOW_WORKER_JAR" -elif [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" ]]; then +if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" ]]; then if [[ -z "$ENDPOINT" ]]; then JOB_PORT=$(python3 -c "$SOCKET_SCRIPT") ENDPOINT="localhost:$JOB_PORT" @@ -417,7 +404,6 @@ ARGS="$ARGS --environment_type=DOCKER" ARGS="$ARGS --environment_config=$CONTAINER:$TAG" ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test/$GCS_SUBFOLDER" ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test/$GCS_SUBFOLDER" -ARGS="$ARGS --dataflow_worker_jar=$DATAFLOW_WORKER_JAR" ARGS="$ARGS --endpoint=$ENDPOINT" if [[ -n "$TEST_EXPANSION_ADDR" ]]; then ARGS="$ARGS --test_expansion_addr=$TEST_EXPANSION_ADDR"